]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Performance improvement for MultiRecordFreeSpace on large relations ---
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.238 2002/09/20 19:56:01 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_index.h"
33 #include "commands/vacuum.h"
34 #include "executor/executor.h"
35 #include "miscadmin.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "storage/smgr.h"
39 #include "tcop/pquery.h"
40 #include "utils/acl.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/inval.h"
44 #include "utils/lsyscache.h"
45 #include "utils/relcache.h"
46 #include "utils/syscache.h"
47 #include "pgstat.h"
48
49
50 typedef struct VacPageData
51 {
52         BlockNumber blkno;                      /* BlockNumber of this Page */
53         Size            free;                   /* FreeSpace on this Page */
54         uint16          offsets_used;   /* Number of OffNums used by vacuum */
55         uint16          offsets_free;   /* Number of OffNums free or to be free */
56         OffsetNumber offsets[1];        /* Array of free OffNums */
57 } VacPageData;
58
59 typedef VacPageData *VacPage;
60
61 typedef struct VacPageListData
62 {
63         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
64         int                     num_pages;              /* Number of pages in pagedesc */
65         int                     num_allocated_pages;    /* Number of allocated pages in
66                                                                                  * pagedesc */
67         VacPage    *pagedesc;           /* Descriptions of pages */
68 } VacPageListData;
69
70 typedef VacPageListData *VacPageList;
71
72 typedef struct VTupleLinkData
73 {
74         ItemPointerData new_tid;
75         ItemPointerData this_tid;
76 } VTupleLinkData;
77
78 typedef VTupleLinkData *VTupleLink;
79
80 typedef struct VTupleMoveData
81 {
82         ItemPointerData tid;            /* tuple ID */
83         VacPage         vacpage;                /* where to move */
84         bool            cleanVpd;               /* clean vacpage before using */
85 } VTupleMoveData;
86
87 typedef VTupleMoveData *VTupleMove;
88
89 typedef struct VRelStats
90 {
91         BlockNumber rel_pages;
92         double          rel_tuples;
93         Size            min_tlen;
94         Size            max_tlen;
95         bool            hasindex;
96         int                     num_vtlinks;
97         VTupleLink      vtlinks;
98 } VRelStats;
99
100
101 static MemoryContext vac_context = NULL;
102
103 static int      elevel = -1;
104
105 static TransactionId OldestXmin;
106 static TransactionId FreezeLimit;
107
108 static TransactionId initialOldestXmin;
109 static TransactionId initialFreezeLimit;
110
111
112 /* non-export function prototypes */
113 static List *getrels(const RangeVar *vacrel, const char *stmttype);
114 static void vac_update_dbstats(Oid dbid,
115                                    TransactionId vacuumXID,
116                                    TransactionId frozenXID);
117 static void vac_truncate_clog(TransactionId vacuumXID,
118                                   TransactionId frozenXID);
119 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
120 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
121 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
122                   VacPageList vacuum_pages, VacPageList fraged_pages);
123 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
124                         VacPageList vacuum_pages, VacPageList fraged_pages,
125                         int nindexes, Relation *Irel);
126 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
127                         VacPageList vacpagelist);
128 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
129 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
130                          double num_tuples, int keep_tuples);
131 static void scan_index(Relation indrel, double num_tuples);
132 static bool tid_reaped(ItemPointer itemptr, void *state);
133 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
134 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
135                            BlockNumber rel_pages);
136 static VacPage copy_vac_page(VacPage vacpage);
137 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
138 static void *vac_bsearch(const void *key, const void *base,
139                         size_t nelem, size_t size,
140                         int (*compar) (const void *, const void *));
141 static int      vac_cmp_blk(const void *left, const void *right);
142 static int      vac_cmp_offno(const void *left, const void *right);
143 static int      vac_cmp_vtlinks(const void *left, const void *right);
144 static bool enough_space(VacPage vacpage, Size len);
145
146
147 /****************************************************************************
148  *                                                                                                                                                      *
149  *                      Code common to all flavors of VACUUM and ANALYZE                                *
150  *                                                                                                                                                      *
151  ****************************************************************************
152  */
153
154
155 /*
156  * Primary entry point for VACUUM and ANALYZE commands.
157  */
158 void
159 vacuum(VacuumStmt *vacstmt)
160 {
161         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
162         MemoryContext anl_context = NULL;
163         List       *vrl,
164                            *cur;
165
166         if (vacstmt->verbose)
167                 elevel = INFO;
168         else
169                 elevel = DEBUG1;
170
171         /*
172          * We cannot run VACUUM inside a user transaction block; if we were
173          * inside a transaction, then our commit- and
174          * start-transaction-command calls would not have the intended effect!
175          * Furthermore, the forced commit that occurs before truncating the
176          * relation's file would have the effect of committing the rest of the
177          * user's transaction too, which would certainly not be the desired
178          * behavior.
179          */
180         if (vacstmt->vacuum && IsTransactionBlock())
181                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
182
183         /* Running VACUUM from a function would free the function context */
184         if (vacstmt->vacuum && !MemoryContextContains(QueryContext, vacstmt))
185                 elog(ERROR, "%s cannot be executed from a function", stmttype);
186
187         /*
188          * Send info about dead objects to the statistics collector
189          */
190         if (vacstmt->vacuum)
191                 pgstat_vacuum_tabstat();
192
193         /*
194          * Create special memory context for cross-transaction storage.
195          *
196          * Since it is a child of QueryContext, it will go away eventually even
197          * if we suffer an error; there's no need for special abort cleanup
198          * logic.
199          */
200         vac_context = AllocSetContextCreate(QueryContext,
201                                                                                 "Vacuum",
202                                                                                 ALLOCSET_DEFAULT_MINSIZE,
203                                                                                 ALLOCSET_DEFAULT_INITSIZE,
204                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
205
206         /*
207          * If we are running only ANALYZE, we don't need per-table
208          * transactions, but we still need a memory context with table
209          * lifetime.
210          */
211         if (vacstmt->analyze && !vacstmt->vacuum)
212                 anl_context = AllocSetContextCreate(QueryContext,
213                                                                                         "Analyze",
214                                                                                         ALLOCSET_DEFAULT_MINSIZE,
215                                                                                         ALLOCSET_DEFAULT_INITSIZE,
216                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
217
218         /* Build list of relations to process (note this lives in vac_context) */
219         vrl = getrels(vacstmt->relation, stmttype);
220
221         /*
222          * Formerly, there was code here to prevent more than one VACUUM from
223          * executing concurrently in the same database.  However, there's no
224          * good reason to prevent that, and manually removing lockfiles after
225          * a vacuum crash was a pain for dbadmins.      So, forget about
226          * lockfiles, and just rely on the locks we grab on each target table
227          * to ensure that there aren't two VACUUMs running on the same table
228          * at the same time.
229          */
230
231         /*
232          * The strangeness with committing and starting transactions here is
233          * due to wanting to run each table's VACUUM as a separate
234          * transaction, so that we don't hold locks unnecessarily long.  Also,
235          * if we are doing VACUUM ANALYZE, the ANALYZE part runs as a separate
236          * transaction from the VACUUM to further reduce locking.
237          *
238          * vacuum_rel expects to be entered with no transaction active; it will
239          * start and commit its own transaction.  But we are called by an SQL
240          * command, and so we are executing inside a transaction already.  We
241          * commit the transaction started in PostgresMain() here, and start
242          * another one before exiting to match the commit waiting for us back
243          * in PostgresMain().
244          *
245          * In the case of an ANALYZE statement (no vacuum, just analyze) it's
246          * okay to run the whole thing in the outer transaction, and so we
247          * skip transaction start/stop operations.
248          */
249         if (vacstmt->vacuum)
250         {
251                 if (vacstmt->relation == NULL)
252                 {
253                         /*
254                          * It's a database-wide VACUUM.
255                          *
256                          * Compute the initially applicable OldestXmin and FreezeLimit
257                          * XIDs, so that we can record these values at the end of the
258                          * VACUUM. Note that individual tables may well be processed
259                          * with newer values, but we can guarantee that no
260                          * (non-shared) relations are processed with older ones.
261                          *
262                          * It is okay to record non-shared values in pg_database, even
263                          * though we may vacuum shared relations with older cutoffs,
264                          * because only the minimum of the values present in
265                          * pg_database matters.  We can be sure that shared relations
266                          * have at some time been vacuumed with cutoffs no worse than
267                          * the global minimum; for, if there is a backend in some
268                          * other DB with xmin = OLDXMIN that's determining the cutoff
269                          * with which we vacuum shared relations, it is not possible
270                          * for that database to have a cutoff newer than OLDXMIN
271                          * recorded in pg_database.
272                          */
273                         vacuum_set_xid_limits(vacstmt, false,
274                                                                 &initialOldestXmin, &initialFreezeLimit);
275                 }
276
277                 /* matches the StartTransaction in PostgresMain() */
278                 CommitTransactionCommand(true);
279         }
280
281         /*
282          * Loop to process each selected relation.
283          */
284         foreach(cur, vrl)
285         {
286                 Oid                     relid = (Oid) lfirsti(cur);
287
288                 if (vacstmt->vacuum)
289                         vacuum_rel(relid, vacstmt, RELKIND_RELATION);
290                 if (vacstmt->analyze)
291                 {
292                         MemoryContext old_context = NULL;
293
294                         /*
295                          * If we vacuumed, use new transaction for analyze.
296                          * Otherwise, we can use the outer transaction, but we still
297                          * need to call analyze_rel in a memory context that will be
298                          * cleaned up on return (else we leak memory while processing
299                          * multiple tables).
300                          */
301                         if (vacstmt->vacuum)
302                                 StartTransactionCommand(true);
303                         else
304                                 old_context = MemoryContextSwitchTo(anl_context);
305
306                         analyze_rel(relid, vacstmt);
307
308                         if (vacstmt->vacuum)
309                                 CommitTransactionCommand(true);
310                         else
311                         {
312                                 MemoryContextSwitchTo(old_context);
313                                 MemoryContextResetAndDeleteChildren(anl_context);
314                         }
315                 }
316         }
317
318         /*
319          * Finish up processing.
320          */
321         if (vacstmt->vacuum)
322         {
323                 /* here, we are not in a transaction */
324
325                 /*
326                  * This matches the CommitTransaction waiting for us in
327                  * PostgresMain(). We tell xact.c not to chain the upcoming
328                  * commit, so that a VACUUM doesn't start a transaction block,
329                  * even when autocommit is off.
330                  */
331                 StartTransactionCommand(true);
332
333                 /*
334                  * If we did a database-wide VACUUM, update the database's
335                  * pg_database row with info about the transaction IDs used, and
336                  * try to truncate pg_clog.
337                  */
338                 if (vacstmt->relation == NULL)
339                 {
340                         vac_update_dbstats(MyDatabaseId,
341                                                            initialOldestXmin, initialFreezeLimit);
342                         vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
343                 }
344         }
345
346         /*
347          * Clean up working storage --- note we must do this after
348          * StartTransactionCommand, else we might be trying to delete the
349          * active context!
350          */
351         MemoryContextDelete(vac_context);
352         vac_context = NULL;
353
354         if (anl_context)
355                 MemoryContextDelete(anl_context);
356 }
357
358 /*
359  * Build a list of Oids for each relation to be processed
360  *
361  * The list is built in vac_context so that it will survive across our
362  * per-relation transactions.
363  */
364 static List *
365 getrels(const RangeVar *vacrel, const char *stmttype)
366 {
367         List       *vrl = NIL;
368         MemoryContext oldcontext;
369
370         if (vacrel)
371         {
372                 /* Process specific relation */
373                 Oid                     relid;
374
375                 relid = RangeVarGetRelid(vacrel, false);
376
377                 /* Make a relation list entry for this guy */
378                 oldcontext = MemoryContextSwitchTo(vac_context);
379                 vrl = lappendi(vrl, relid);
380                 MemoryContextSwitchTo(oldcontext);
381         }
382         else
383         {
384                 /* Process all plain relations listed in pg_class */
385                 Relation        pgclass;
386                 HeapScanDesc scan;
387                 HeapTuple       tuple;
388                 ScanKeyData key;
389
390                 ScanKeyEntryInitialize(&key, 0x0,
391                                                            Anum_pg_class_relkind,
392                                                            F_CHAREQ,
393                                                            CharGetDatum(RELKIND_RELATION));
394
395                 pgclass = heap_openr(RelationRelationName, AccessShareLock);
396
397                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
398
399                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
400                 {
401                         /* Make a relation list entry for this guy */
402                         oldcontext = MemoryContextSwitchTo(vac_context);
403                         vrl = lappendi(vrl, HeapTupleGetOid(tuple));
404                         MemoryContextSwitchTo(oldcontext);
405                 }
406
407                 heap_endscan(scan);
408                 heap_close(pgclass, AccessShareLock);
409         }
410
411         return vrl;
412 }
413
414 /*
415  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
416  */
417 void
418 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
419                                           TransactionId *oldestXmin,
420                                           TransactionId *freezeLimit)
421 {
422         TransactionId limit;
423
424         *oldestXmin = GetOldestXmin(sharedRel);
425
426         Assert(TransactionIdIsNormal(*oldestXmin));
427
428         if (vacstmt->freeze)
429         {
430                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
431                 limit = *oldestXmin;
432         }
433         else
434         {
435                 /*
436                  * Normal case: freeze cutoff is well in the past, to wit, about
437                  * halfway to the wrap horizon
438                  */
439                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
440         }
441
442         /*
443          * Be careful not to generate a "permanent" XID
444          */
445         if (!TransactionIdIsNormal(limit))
446                 limit = FirstNormalTransactionId;
447
448         /*
449          * Ensure sane relationship of limits
450          */
451         if (TransactionIdFollows(limit, *oldestXmin))
452         {
453                 elog(WARNING, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
454                 limit = *oldestXmin;
455         }
456
457         *freezeLimit = limit;
458 }
459
460
461 /*
462  *      vac_update_relstats() -- update statistics for one relation
463  *
464  *              Update the whole-relation statistics that are kept in its pg_class
465  *              row.  There are additional stats that will be updated if we are
466  *              doing ANALYZE, but we always update these stats.  This routine works
467  *              for both index and heap relation entries in pg_class.
468  *
469  *              We violate no-overwrite semantics here by storing new values for the
470  *              statistics columns directly into the pg_class tuple that's already on
471  *              the page.  The reason for this is that if we updated these tuples in
472  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
473  *              by the time we got done with a vacuum cycle, most of the tuples in
474  *              pg_class would've been obsoleted.  Of course, this only works for
475  *              fixed-size never-null columns, but these are.
476  *
477  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
478  *              ANALYZE.
479  */
480 void
481 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
482                                         bool hasindex)
483 {
484         Relation        rd;
485         HeapTupleData rtup;
486         HeapTuple       ctup;
487         Form_pg_class pgcform;
488         Buffer          buffer;
489
490         /*
491          * update number of tuples and number of pages in pg_class
492          */
493         rd = heap_openr(RelationRelationName, RowExclusiveLock);
494
495         ctup = SearchSysCache(RELOID,
496                                                   ObjectIdGetDatum(relid),
497                                                   0, 0, 0);
498         if (!HeapTupleIsValid(ctup))
499                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
500                          relid);
501
502         /* get the buffer cache tuple */
503         rtup.t_self = ctup->t_self;
504         ReleaseSysCache(ctup);
505         if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
506                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
507                          relid);
508
509         /* overwrite the existing statistics in the tuple */
510         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
511         pgcform->relpages = (int32) num_pages;
512         pgcform->reltuples = num_tuples;
513         pgcform->relhasindex = hasindex;
514
515         /*
516          * If we have discovered that there are no indexes, then there's no
517          * primary key either.  This could be done more thoroughly...
518          */
519         if (!hasindex)
520                 pgcform->relhaspkey = false;
521
522         /*
523          * Invalidate the tuple in the catcaches; this also arranges to flush
524          * the relation's relcache entry.  (If we fail to commit for some
525          * reason, no flush will occur, but no great harm is done since there
526          * are no noncritical state updates here.)
527          */
528         CacheInvalidateHeapTuple(rd, &rtup);
529
530         /* Write the buffer */
531         WriteBuffer(buffer);
532
533         heap_close(rd, RowExclusiveLock);
534 }
535
536
537 /*
538  *      vac_update_dbstats() -- update statistics for one database
539  *
540  *              Update the whole-database statistics that are kept in its pg_database
541  *              row.
542  *
543  *              We violate no-overwrite semantics here by storing new values for the
544  *              statistics columns directly into the tuple that's already on the page.
545  *              As with vac_update_relstats, this avoids leaving dead tuples behind
546  *              after a VACUUM; which is good since GetRawDatabaseInfo
547  *              can get confused by finding dead tuples in pg_database.
548  *
549  *              This routine is shared by full and lazy VACUUM.  Note that it is only
550  *              applied after a database-wide VACUUM operation.
551  */
552 static void
553 vac_update_dbstats(Oid dbid,
554                                    TransactionId vacuumXID,
555                                    TransactionId frozenXID)
556 {
557         Relation        relation;
558         ScanKeyData entry[1];
559         HeapScanDesc scan;
560         HeapTuple       tuple;
561         Form_pg_database dbform;
562
563         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
564
565         /* Must use a heap scan, since there's no syscache for pg_database */
566         ScanKeyEntryInitialize(&entry[0], 0x0,
567                                                    ObjectIdAttributeNumber, F_OIDEQ,
568                                                    ObjectIdGetDatum(dbid));
569
570         scan = heap_beginscan(relation, SnapshotNow, 1, entry);
571
572         tuple = heap_getnext(scan, ForwardScanDirection);
573
574         if (!HeapTupleIsValid(tuple))
575                 elog(ERROR, "database %u does not exist", dbid);
576
577         dbform = (Form_pg_database) GETSTRUCT(tuple);
578
579         /* overwrite the existing statistics in the tuple */
580         dbform->datvacuumxid = vacuumXID;
581         dbform->datfrozenxid = frozenXID;
582
583         /* invalidate the tuple in the cache and write the buffer */
584         CacheInvalidateHeapTuple(relation, tuple);
585         WriteNoReleaseBuffer(scan->rs_cbuf);
586
587         heap_endscan(scan);
588
589         heap_close(relation, RowExclusiveLock);
590 }
591
592
593 /*
594  *      vac_truncate_clog() -- attempt to truncate the commit log
595  *
596  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
597  *              and use it to truncate the transaction commit log (pg_clog).
598  *              Also generate a warning if the system-wide oldest datfrozenxid
599  *              seems to be in danger of wrapping around.
600  *
601  *              The passed XIDs are simply the ones I just wrote into my pg_database
602  *              entry.  They're used to initialize the "min" calculations.
603  *
604  *              This routine is shared by full and lazy VACUUM.  Note that it is only
605  *              applied after a database-wide VACUUM operation.
606  */
607 static void
608 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
609 {
610         TransactionId myXID;
611         Relation        relation;
612         HeapScanDesc scan;
613         HeapTuple       tuple;
614         int32           age;
615         bool            vacuumAlreadyWrapped = false;
616         bool            frozenAlreadyWrapped = false;
617
618         myXID = GetCurrentTransactionId();
619
620         relation = heap_openr(DatabaseRelationName, AccessShareLock);
621
622         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
623
624         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
625         {
626                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
627
628                 /* Ignore non-connectable databases (eg, template0) */
629                 /* It's assumed that these have been frozen correctly */
630                 if (!dbform->datallowconn)
631                         continue;
632
633                 if (TransactionIdIsNormal(dbform->datvacuumxid))
634                 {
635                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
636                                 vacuumAlreadyWrapped = true;
637                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
638                                 vacuumXID = dbform->datvacuumxid;
639                 }
640                 if (TransactionIdIsNormal(dbform->datfrozenxid))
641                 {
642                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
643                                 frozenAlreadyWrapped = true;
644                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
645                                 frozenXID = dbform->datfrozenxid;
646                 }
647         }
648
649         heap_endscan(scan);
650
651         heap_close(relation, AccessShareLock);
652
653         /*
654          * Do not truncate CLOG if we seem to have suffered wraparound
655          * already; the computed minimum XID might be bogus.
656          */
657         if (vacuumAlreadyWrapped)
658         {
659                 elog(WARNING, "Some databases have not been vacuumed in over 2 billion transactions."
660                          "\n\tYou may have already suffered transaction-wraparound data loss.");
661                 return;
662         }
663
664         /* Truncate CLOG to the oldest vacuumxid */
665         TruncateCLOG(vacuumXID);
666
667         /* Give warning about impending wraparound problems */
668         if (frozenAlreadyWrapped)
669         {
670                 elog(WARNING, "Some databases have not been vacuumed in over 1 billion transactions."
671                          "\n\tBetter vacuum them soon, or you may have a wraparound failure.");
672         }
673         else
674         {
675                 age = (int32) (myXID - frozenXID);
676                 if (age > (int32) ((MaxTransactionId >> 3) * 3))
677                         elog(WARNING, "Some databases have not been vacuumed in %d transactions."
678                                  "\n\tBetter vacuum them within %d transactions,"
679                                  "\n\tor you may have a wraparound failure.",
680                                  age, (int32) (MaxTransactionId >> 1) - age);
681         }
682 }
683
684
685 /****************************************************************************
686  *                                                                                                                                                      *
687  *                      Code common to both flavors of VACUUM                                                   *
688  *                                                                                                                                                      *
689  ****************************************************************************
690  */
691
692
693 /*
694  *      vacuum_rel() -- vacuum one heap relation
695  *
696  *              Doing one heap at a time incurs extra overhead, since we need to
697  *              check that the heap exists again just before we vacuum it.      The
698  *              reason that we do this is so that vacuuming can be spread across
699  *              many small transactions.  Otherwise, two-phase locking would require
700  *              us to lock the entire database during one pass of the vacuum cleaner.
701  *
702  *              At entry and exit, we are not inside a transaction.
703  */
704 static void
705 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
706 {
707         LOCKMODE        lmode;
708         Relation        onerel;
709         LockRelId       onerelid;
710         Oid                     toast_relid;
711
712         /* Begin a transaction for vacuuming this relation */
713         StartTransactionCommand(true);
714
715         /*
716          * Check for user-requested abort.      Note we want this to be inside a
717          * transaction, so xact.c doesn't issue useless WARNING.
718          */
719         CHECK_FOR_INTERRUPTS();
720
721         /*
722          * Race condition -- if the pg_class tuple has gone away since the
723          * last time we saw it, we don't need to vacuum it.
724          */
725         if (!SearchSysCacheExists(RELOID,
726                                                           ObjectIdGetDatum(relid),
727                                                           0, 0, 0))
728         {
729                 CommitTransactionCommand(true);
730                 return;
731         }
732
733         /*
734          * Determine the type of lock we want --- hard exclusive lock for a
735          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
736          * vacuum.      Either way, we can be sure that no other backend is
737          * vacuuming the same table.
738          */
739         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
740
741         /*
742          * Open the class, get an appropriate lock on it, and check
743          * permissions.
744          *
745          * We allow the user to vacuum a table if he is superuser, the table
746          * owner, or the database owner (but in the latter case, only if it's
747          * not a shared relation).      pg_class_ownercheck includes the superuser
748          * case.
749          *
750          * Note we choose to treat permissions failure as a WARNING and keep
751          * trying to vacuum the rest of the DB --- is this appropriate?
752          */
753         onerel = relation_open(relid, lmode);
754
755         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
756                   (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
757         {
758                 elog(WARNING, "Skipping \"%s\" --- only table or database owner can VACUUM it",
759                          RelationGetRelationName(onerel));
760                 relation_close(onerel, lmode);
761                 CommitTransactionCommand(true);
762                 return;
763         }
764
765         /*
766          * Check that it's a plain table; we used to do this in getrels() but
767          * seems safer to check after we've locked the relation.
768          */
769         if (onerel->rd_rel->relkind != expected_relkind)
770         {
771                 elog(WARNING, "Skipping \"%s\" --- can not process indexes, views or special system tables",
772                          RelationGetRelationName(onerel));
773                 relation_close(onerel, lmode);
774                 CommitTransactionCommand(true);
775                 return;
776         }
777
778         /*
779          * Get a session-level lock too. This will protect our access to the
780          * relation across multiple transactions, so that we can vacuum the
781          * relation's TOAST table (if any) secure in the knowledge that no one
782          * is deleting the parent relation.
783          *
784          * NOTE: this cannot block, even if someone else is waiting for access,
785          * because the lock manager knows that both lock requests are from the
786          * same process.
787          */
788         onerelid = onerel->rd_lockInfo.lockRelId;
789         LockRelationForSession(&onerelid, lmode);
790
791         /*
792          * Remember the relation's TOAST relation for later
793          */
794         toast_relid = onerel->rd_rel->reltoastrelid;
795
796         /*
797          * Do the actual work --- either FULL or "lazy" vacuum
798          */
799         if (vacstmt->full)
800                 full_vacuum_rel(onerel, vacstmt);
801         else
802                 lazy_vacuum_rel(onerel, vacstmt);
803
804         /* all done with this class, but hold lock until commit */
805         relation_close(onerel, NoLock);
806
807         /*
808          * Complete the transaction and free all temporary memory used.
809          */
810         CommitTransactionCommand(true);
811
812         /*
813          * If the relation has a secondary toast rel, vacuum that too while we
814          * still hold the session lock on the master table.  Note however that
815          * "analyze" will not get done on the toast table.      This is good,
816          * because the toaster always uses hardcoded index access and
817          * statistics are totally unimportant for toast relations.
818          */
819         if (toast_relid != InvalidOid)
820                 vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE);
821
822         /*
823          * Now release the session-level lock on the master table.
824          */
825         UnlockRelationForSession(&onerelid, lmode);
826 }
827
828
829 /****************************************************************************
830  *                                                                                                                                                      *
831  *                      Code for VACUUM FULL (only)                                                                             *
832  *                                                                                                                                                      *
833  ****************************************************************************
834  */
835
836
837 /*
838  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
839  *
840  *              This routine vacuums a single heap, cleans out its indexes, and
841  *              updates its num_pages and num_tuples statistics.
842  *
843  *              At entry, we have already established a transaction and opened
844  *              and locked the relation.
845  */
846 static void
847 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
848 {
849         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
850                                                                                  * clean indexes */
851         VacPageListData fraged_pages;           /* List of pages with space enough
852                                                                                  * for re-using */
853         Relation   *Irel;
854         int                     nindexes,
855                                 i;
856         VRelStats  *vacrelstats;
857         bool            reindex = false;
858
859         if (IsIgnoringSystemIndexes() &&
860                 IsSystemRelation(onerel))
861                 reindex = true;
862
863         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
864                                                   &OldestXmin, &FreezeLimit);
865
866         /*
867          * Set up statistics-gathering machinery.
868          */
869         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
870         vacrelstats->rel_pages = 0;
871         vacrelstats->rel_tuples = 0;
872         vacrelstats->hasindex = false;
873
874         /* scan the heap */
875         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
876         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
877
878         /* Now open all indexes of the relation */
879         vac_open_indexes(onerel, &nindexes, &Irel);
880         if (!Irel)
881                 reindex = false;
882         else if (!RelationGetForm(onerel)->relhasindex)
883                 reindex = true;
884         if (nindexes > 0)
885                 vacrelstats->hasindex = true;
886
887 #ifdef NOT_USED
888
889         /*
890          * reindex in VACUUM is dangerous under WAL. ifdef out until it
891          * becomes safe.
892          */
893         if (reindex)
894         {
895                 vac_close_indexes(nindexes, Irel);
896                 Irel = (Relation *) NULL;
897                 activate_indexes_of_a_table(RelationGetRelid(onerel), false);
898         }
899 #endif   /* NOT_USED */
900
901         /* Clean/scan index relation(s) */
902         if (Irel != (Relation *) NULL)
903         {
904                 if (vacuum_pages.num_pages > 0)
905                 {
906                         for (i = 0; i < nindexes; i++)
907                                 vacuum_index(&vacuum_pages, Irel[i],
908                                                          vacrelstats->rel_tuples, 0);
909                 }
910                 else
911                 {
912                         /* just scan indexes to update statistic */
913                         for (i = 0; i < nindexes; i++)
914                                 scan_index(Irel[i], vacrelstats->rel_tuples);
915                 }
916         }
917
918         if (fraged_pages.num_pages > 0)
919         {
920                 /* Try to shrink heap */
921                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
922                                         nindexes, Irel);
923                 vac_close_indexes(nindexes, Irel);
924         }
925         else
926         {
927                 vac_close_indexes(nindexes, Irel);
928                 if (vacuum_pages.num_pages > 0)
929                 {
930                         /* Clean pages from vacuum_pages list */
931                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
932                 }
933                 else
934                 {
935                         /*
936                          * Flush dirty pages out to disk.  We must do this even if we
937                          * didn't do anything else, because we want to ensure that all
938                          * tuples have correct on-row commit status on disk (see
939                          * bufmgr.c's comments for FlushRelationBuffers()).
940                          */
941                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
942                         if (i < 0)
943                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
944                                          i);
945                 }
946         }
947
948 #ifdef NOT_USED
949         if (reindex)
950                 activate_indexes_of_a_table(RelationGetRelid(onerel), true);
951 #endif   /* NOT_USED */
952
953         /* update shared free space map with final free space info */
954         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
955
956         /* update statistics in pg_class */
957         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
958                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
959 }
960
961
962 /*
963  *      scan_heap() -- scan an open heap relation
964  *
965  *              This routine sets commit status bits, constructs vacuum_pages (list
966  *              of pages we need to compact free space on and/or clean indexes of
967  *              deleted tuples), constructs fraged_pages (list of pages with free
968  *              space that tuples could be moved into), and calculates statistics
969  *              on the number of live tuples in the heap.
970  */
971 static void
972 scan_heap(VRelStats *vacrelstats, Relation onerel,
973                   VacPageList vacuum_pages, VacPageList fraged_pages)
974 {
975         BlockNumber nblocks,
976                                 blkno;
977         ItemId          itemid;
978         Buffer          buf;
979         HeapTupleData tuple;
980         OffsetNumber offnum,
981                                 maxoff;
982         bool            pgchanged,
983                                 tupgone,
984                                 notup;
985         char       *relname;
986         VacPage         vacpage,
987                                 vacpagecopy;
988         BlockNumber empty_pages,
989                                 new_pages,
990                                 changed_pages,
991                                 empty_end_pages;
992         double          num_tuples,
993                                 tups_vacuumed,
994                                 nkeep,
995                                 nunused;
996         double          free_size,
997                                 usable_free_size;
998         Size            min_tlen = MaxTupleSize;
999         Size            max_tlen = 0;
1000         int                     i;
1001         bool            do_shrinking = true;
1002         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1003         int                     num_vtlinks = 0;
1004         int                     free_vtlinks = 100;
1005         VacRUsage       ru0;
1006
1007         vac_init_rusage(&ru0);
1008
1009         relname = RelationGetRelationName(onerel);
1010         elog(elevel, "--Relation %s.%s--",
1011                  get_namespace_name(RelationGetNamespace(onerel)),
1012                  relname);
1013
1014         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
1015         num_tuples = tups_vacuumed = nkeep = nunused = 0;
1016         free_size = 0;
1017
1018         nblocks = RelationGetNumberOfBlocks(onerel);
1019
1020         /*
1021          * We initially create each VacPage item in a maximal-sized workspace,
1022          * then copy the workspace into a just-large-enough copy.
1023          */
1024         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1025
1026         for (blkno = 0; blkno < nblocks; blkno++)
1027         {
1028                 Page            page,
1029                                         tempPage = NULL;
1030                 bool            do_reap,
1031                                         do_frag;
1032
1033                 CHECK_FOR_INTERRUPTS();
1034
1035                 buf = ReadBuffer(onerel, blkno);
1036                 page = BufferGetPage(buf);
1037
1038                 vacpage->blkno = blkno;
1039                 vacpage->offsets_used = 0;
1040                 vacpage->offsets_free = 0;
1041
1042                 if (PageIsNew(page))
1043                 {
1044                         elog(WARNING, "Rel %s: Uninitialized page %u - fixing",
1045                                  relname, blkno);
1046                         PageInit(page, BufferGetPageSize(buf), 0);
1047                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1048                         free_size += (vacpage->free - sizeof(ItemIdData));
1049                         new_pages++;
1050                         empty_end_pages++;
1051                         vacpagecopy = copy_vac_page(vacpage);
1052                         vpage_insert(vacuum_pages, vacpagecopy);
1053                         vpage_insert(fraged_pages, vacpagecopy);
1054                         WriteBuffer(buf);
1055                         continue;
1056                 }
1057
1058                 if (PageIsEmpty(page))
1059                 {
1060                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1061                         free_size += (vacpage->free - sizeof(ItemIdData));
1062                         empty_pages++;
1063                         empty_end_pages++;
1064                         vacpagecopy = copy_vac_page(vacpage);
1065                         vpage_insert(vacuum_pages, vacpagecopy);
1066                         vpage_insert(fraged_pages, vacpagecopy);
1067                         ReleaseBuffer(buf);
1068                         continue;
1069                 }
1070
1071                 pgchanged = false;
1072                 notup = true;
1073                 maxoff = PageGetMaxOffsetNumber(page);
1074                 for (offnum = FirstOffsetNumber;
1075                          offnum <= maxoff;
1076                          offnum = OffsetNumberNext(offnum))
1077                 {
1078                         uint16          sv_infomask;
1079
1080                         itemid = PageGetItemId(page, offnum);
1081
1082                         /*
1083                          * Collect un-used items too - it's possible to have indexes
1084                          * pointing here after crash.
1085                          */
1086                         if (!ItemIdIsUsed(itemid))
1087                         {
1088                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1089                                 nunused += 1;
1090                                 continue;
1091                         }
1092
1093                         tuple.t_datamcxt = NULL;
1094                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1095                         tuple.t_len = ItemIdGetLength(itemid);
1096                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1097
1098                         tupgone = false;
1099                         sv_infomask = tuple.t_data->t_infomask;
1100
1101                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1102                         {
1103                                 case HEAPTUPLE_DEAD:
1104                                         tupgone = true;         /* we can delete the tuple */
1105                                         break;
1106                                 case HEAPTUPLE_LIVE:
1107
1108                                         /*
1109                                          * Tuple is good.  Consider whether to replace its
1110                                          * xmin value with FrozenTransactionId.
1111                                          */
1112                                         if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
1113                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1114                                                                                           FreezeLimit))
1115                                         {
1116                                                 HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
1117                                                 /* infomask should be okay already */
1118                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1119                                                 pgchanged = true;
1120                                         }
1121                                         break;
1122                                 case HEAPTUPLE_RECENTLY_DEAD:
1123
1124                                         /*
1125                                          * If tuple is recently deleted then we must not
1126                                          * remove it from relation.
1127                                          */
1128                                         nkeep += 1;
1129
1130                                         /*
1131                                          * If we do shrinking and this tuple is updated one
1132                                          * then remember it to construct updated tuple
1133                                          * dependencies.
1134                                          */
1135                                         if (do_shrinking &&
1136                                                 !(ItemPointerEquals(&(tuple.t_self),
1137                                                                                         &(tuple.t_data->t_ctid))))
1138                                         {
1139                                                 if (free_vtlinks == 0)
1140                                                 {
1141                                                         free_vtlinks = 1000;
1142                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1143                                                                                    (free_vtlinks + num_vtlinks) *
1144                                                                                                  sizeof(VTupleLinkData));
1145                                                 }
1146                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1147                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1148                                                 free_vtlinks--;
1149                                                 num_vtlinks++;
1150                                         }
1151                                         break;
1152                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1153
1154                                         /*
1155                                          * This should not happen, since we hold exclusive
1156                                          * lock on the relation; shouldn't we raise an error?
1157                                          */
1158                                         elog(WARNING, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
1159                                                  relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data));
1160                                         do_shrinking = false;
1161                                         break;
1162                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1163
1164                                         /*
1165                                          * This should not happen, since we hold exclusive
1166                                          * lock on the relation; shouldn't we raise an error?
1167                                          */
1168                                         elog(WARNING, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
1169                                                  relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data));
1170                                         do_shrinking = false;
1171                                         break;
1172                                 default:
1173                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
1174                                         break;
1175                         }
1176
1177                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1178                         if (sv_infomask != tuple.t_data->t_infomask)
1179                                 pgchanged = true;
1180
1181                         /*
1182                          * Other checks...
1183                          */
1184                         if (onerel->rd_rel->relhasoids &&
1185                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1186                                 elog(WARNING, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
1187                                          relname, blkno, offnum, (int) tupgone);
1188
1189                         if (tupgone)
1190                         {
1191                                 ItemId          lpp;
1192
1193                                 /*
1194                                  * Here we are building a temporary copy of the page with
1195                                  * dead tuples removed.  Below we will apply
1196                                  * PageRepairFragmentation to the copy, so that we can
1197                                  * determine how much space will be available after
1198                                  * removal of dead tuples.      But note we are NOT changing
1199                                  * the real page yet...
1200                                  */
1201                                 if (tempPage == (Page) NULL)
1202                                 {
1203                                         Size            pageSize;
1204
1205                                         pageSize = PageGetPageSize(page);
1206                                         tempPage = (Page) palloc(pageSize);
1207                                         memcpy(tempPage, page, pageSize);
1208                                 }
1209
1210                                 /* mark it unused on the temp page */
1211                                 lpp = PageGetItemId(tempPage, offnum);
1212                                 lpp->lp_flags &= ~LP_USED;
1213
1214                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1215                                 tups_vacuumed += 1;
1216                         }
1217                         else
1218                         {
1219                                 num_tuples += 1;
1220                                 notup = false;
1221                                 if (tuple.t_len < min_tlen)
1222                                         min_tlen = tuple.t_len;
1223                                 if (tuple.t_len > max_tlen)
1224                                         max_tlen = tuple.t_len;
1225                         }
1226                 }                                               /* scan along page */
1227
1228                 if (tempPage != (Page) NULL)
1229                 {
1230                         /* Some tuples are removable; figure free space after removal */
1231                         PageRepairFragmentation(tempPage, NULL);
1232                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1233                         pfree(tempPage);
1234                         do_reap = true;
1235                 }
1236                 else
1237                 {
1238                         /* Just use current available space */
1239                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1240                         /* Need to reap the page if it has ~LP_USED line pointers */
1241                         do_reap = (vacpage->offsets_free > 0);
1242                 }
1243
1244                 free_size += vacpage->free;
1245
1246                 /*
1247                  * Add the page to fraged_pages if it has a useful amount of free
1248                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1249                  * don't know that accurately near the start of the relation, so
1250                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1251                  */
1252                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1253
1254                 if (do_reap || do_frag)
1255                 {
1256                         vacpagecopy = copy_vac_page(vacpage);
1257                         if (do_reap)
1258                                 vpage_insert(vacuum_pages, vacpagecopy);
1259                         if (do_frag)
1260                                 vpage_insert(fraged_pages, vacpagecopy);
1261                 }
1262
1263                 if (notup)
1264                         empty_end_pages++;
1265                 else
1266                         empty_end_pages = 0;
1267
1268                 if (pgchanged)
1269                 {
1270                         WriteBuffer(buf);
1271                         changed_pages++;
1272                 }
1273                 else
1274                         ReleaseBuffer(buf);
1275         }
1276
1277         pfree(vacpage);
1278
1279         /* save stats in the rel list for use later */
1280         vacrelstats->rel_tuples = num_tuples;
1281         vacrelstats->rel_pages = nblocks;
1282         if (num_tuples == 0)
1283                 min_tlen = max_tlen = 0;
1284         vacrelstats->min_tlen = min_tlen;
1285         vacrelstats->max_tlen = max_tlen;
1286
1287         vacuum_pages->empty_end_pages = empty_end_pages;
1288         fraged_pages->empty_end_pages = empty_end_pages;
1289
1290         /*
1291          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1292          * remove any "empty" end-pages from the list, and compute usable free
1293          * space = free space in remaining pages.
1294          */
1295         if (do_shrinking)
1296         {
1297                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1298                 fraged_pages->num_pages -= empty_end_pages;
1299                 usable_free_size = 0;
1300                 for (i = 0; i < fraged_pages->num_pages; i++)
1301                         usable_free_size += fraged_pages->pagedesc[i]->free;
1302         }
1303         else
1304         {
1305                 fraged_pages->num_pages = 0;
1306                 usable_free_size = 0;
1307         }
1308
1309         /* don't bother to save vtlinks if we will not call repair_frag */
1310         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1311         {
1312                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1313                           vac_cmp_vtlinks);
1314                 vacrelstats->vtlinks = vtlinks;
1315                 vacrelstats->num_vtlinks = num_vtlinks;
1316         }
1317         else
1318         {
1319                 vacrelstats->vtlinks = NULL;
1320                 vacrelstats->num_vtlinks = 0;
1321                 pfree(vtlinks);
1322         }
1323
1324         elog(elevel, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; "
1325                  "Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, "
1326                  "MaxLen %lu; Re-using: Free/Avail. Space %.0f/%.0f; "
1327                  "EndEmpty/Avail. Pages %u/%u.\n\t%s",
1328                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1329                  new_pages, num_tuples, tups_vacuumed,
1330                  nkeep, vacrelstats->num_vtlinks,
1331                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1332                  free_size, usable_free_size,
1333                  empty_end_pages, fraged_pages->num_pages,
1334                  vac_show_rusage(&ru0));
1335 }
1336
1337
1338 /*
1339  *      repair_frag() -- try to repair relation's fragmentation
1340  *
1341  *              This routine marks dead tuples as unused and tries re-use dead space
1342  *              by moving tuples (and inserting indexes if needed). It constructs
1343  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1344  *              for them after committing (in hack-manner - without losing locks
1345  *              and freeing memory!) current transaction. It truncates relation
1346  *              if some end-blocks are gone away.
1347  */
1348 static void
1349 repair_frag(VRelStats *vacrelstats, Relation onerel,
1350                         VacPageList vacuum_pages, VacPageList fraged_pages,
1351                         int nindexes, Relation *Irel)
1352 {
1353         TransactionId myXID;
1354         CommandId       myCID;
1355         Buffer          buf,
1356                                 cur_buffer;
1357         BlockNumber nblocks,
1358                                 blkno;
1359         BlockNumber last_move_dest_block = 0,
1360                                 last_vacuum_block;
1361         Page            page,
1362                                 ToPage = NULL;
1363         OffsetNumber offnum,
1364                                 maxoff,
1365                                 newoff,
1366                                 max_offset;
1367         ItemId          itemid,
1368                                 newitemid;
1369         HeapTupleData tuple,
1370                                 newtup;
1371         TupleDesc       tupdesc;
1372         ResultRelInfo *resultRelInfo;
1373         EState     *estate;
1374         TupleTable      tupleTable;
1375         TupleTableSlot *slot;
1376         VacPageListData Nvacpagelist;
1377         VacPage         cur_page = NULL,
1378                                 last_vacuum_page,
1379                                 vacpage,
1380                            *curpage;
1381         int                     cur_item = 0;
1382         int                     i;
1383         Size            tuple_len;
1384         int                     num_moved,
1385                                 num_fraged_pages,
1386                                 vacuumed_pages;
1387         int                     checked_moved,
1388                                 num_tuples,
1389                                 keep_tuples = 0;
1390         bool            isempty,
1391                                 dowrite,
1392                                 chain_tuple_moved;
1393         VacRUsage       ru0;
1394
1395         vac_init_rusage(&ru0);
1396
1397         myXID = GetCurrentTransactionId();
1398         myCID = GetCurrentCommandId();
1399
1400         tupdesc = RelationGetDescr(onerel);
1401
1402         /*
1403          * We need a ResultRelInfo and an EState so we can use the regular
1404          * executor's index-entry-making machinery.
1405          */
1406         resultRelInfo = makeNode(ResultRelInfo);
1407         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1408         resultRelInfo->ri_RelationDesc = onerel;
1409         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1410
1411         ExecOpenIndices(resultRelInfo);
1412
1413         estate = CreateExecutorState();
1414         estate->es_result_relations = resultRelInfo;
1415         estate->es_num_result_relations = 1;
1416         estate->es_result_relation_info = resultRelInfo;
1417
1418         /* Set up a dummy tuple table too */
1419         tupleTable = ExecCreateTupleTable(1);
1420         slot = ExecAllocTableSlot(tupleTable);
1421         ExecSetSlotDescriptor(slot, tupdesc, false);
1422
1423         Nvacpagelist.num_pages = 0;
1424         num_fraged_pages = fraged_pages->num_pages;
1425         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1426         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1427         if (vacuumed_pages > 0)
1428         {
1429                 /* get last reaped page from vacuum_pages */
1430                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1431                 last_vacuum_block = last_vacuum_page->blkno;
1432         }
1433         else
1434         {
1435                 last_vacuum_page = NULL;
1436                 last_vacuum_block = InvalidBlockNumber;
1437         }
1438         cur_buffer = InvalidBuffer;
1439         num_moved = 0;
1440
1441         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1442         vacpage->offsets_used = vacpage->offsets_free = 0;
1443
1444         /*
1445          * Scan pages backwards from the last nonempty page, trying to move
1446          * tuples down to lower pages.  Quit when we reach a page that we have
1447          * moved any tuples onto, or the first page if we haven't moved
1448          * anything, or when we find a page we cannot completely empty (this
1449          * last condition is handled by "break" statements within the loop).
1450          *
1451          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1452          * in order by blkno.
1453          */
1454         nblocks = vacrelstats->rel_pages;
1455         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1456                  blkno > last_move_dest_block;
1457                  blkno--)
1458         {
1459                 CHECK_FOR_INTERRUPTS();
1460
1461                 /*
1462                  * Forget fraged_pages pages at or after this one; they're no
1463                  * longer useful as move targets, since we only want to move down.
1464                  * Note that since we stop the outer loop at last_move_dest_block,
1465                  * pages removed here cannot have had anything moved onto them
1466                  * already.
1467                  *
1468                  * Also note that we don't change the stored fraged_pages list, only
1469                  * our local variable num_fraged_pages; so the forgotten pages are
1470                  * still available to be loaded into the free space map later.
1471                  */
1472                 while (num_fraged_pages > 0 &&
1473                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1474                 {
1475                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1476                         --num_fraged_pages;
1477                 }
1478
1479                 /*
1480                  * Process this page of relation.
1481                  */
1482                 buf = ReadBuffer(onerel, blkno);
1483                 page = BufferGetPage(buf);
1484
1485                 vacpage->offsets_free = 0;
1486
1487                 isempty = PageIsEmpty(page);
1488
1489                 dowrite = false;
1490
1491                 /* Is the page in the vacuum_pages list? */
1492                 if (blkno == last_vacuum_block)
1493                 {
1494                         if (last_vacuum_page->offsets_free > 0)
1495                         {
1496                                 /* there are dead tuples on this page - clean them */
1497                                 Assert(!isempty);
1498                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1499                                 vacuum_page(onerel, buf, last_vacuum_page);
1500                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1501                                 dowrite = true;
1502                         }
1503                         else
1504                                 Assert(isempty);
1505                         --vacuumed_pages;
1506                         if (vacuumed_pages > 0)
1507                         {
1508                                 /* get prev reaped page from vacuum_pages */
1509                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1510                                 last_vacuum_block = last_vacuum_page->blkno;
1511                         }
1512                         else
1513                         {
1514                                 last_vacuum_page = NULL;
1515                                 last_vacuum_block = InvalidBlockNumber;
1516                         }
1517                         if (isempty)
1518                         {
1519                                 ReleaseBuffer(buf);
1520                                 continue;
1521                         }
1522                 }
1523                 else
1524                         Assert(!isempty);
1525
1526                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1527                                                                                  * off this page, yet */
1528                 vacpage->blkno = blkno;
1529                 maxoff = PageGetMaxOffsetNumber(page);
1530                 for (offnum = FirstOffsetNumber;
1531                          offnum <= maxoff;
1532                          offnum = OffsetNumberNext(offnum))
1533                 {
1534                         itemid = PageGetItemId(page, offnum);
1535
1536                         if (!ItemIdIsUsed(itemid))
1537                                 continue;
1538
1539                         tuple.t_datamcxt = NULL;
1540                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1541                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1542                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1543
1544                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1545                         {
1546                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1547                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1548
1549                                 /*
1550                                  * If this (chain) tuple is moved by me already then I
1551                                  * have to check is it in vacpage or not - i.e. is it
1552                                  * moved while cleaning this page or some previous one.
1553                                  */
1554                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1555                                 {
1556                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1557                                                 elog(ERROR, "Invalid XVAC in tuple header");
1558                                         if (keep_tuples == 0)
1559                                                 continue;
1560                                         if (chain_tuple_moved)          /* some chains was moved
1561                                                                                                  * while */
1562                                         {                       /* cleaning this page */
1563                                                 Assert(vacpage->offsets_free > 0);
1564                                                 for (i = 0; i < vacpage->offsets_free; i++)
1565                                                 {
1566                                                         if (vacpage->offsets[i] == offnum)
1567                                                                 break;
1568                                                 }
1569                                                 if (i >= vacpage->offsets_free) /* not found */
1570                                                 {
1571                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1572                                                         keep_tuples--;
1573                                                 }
1574                                         }
1575                                         else
1576                                         {
1577                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1578                                                 keep_tuples--;
1579                                         }
1580                                         continue;
1581                                 }
1582                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1583                         }
1584
1585                         /*
1586                          * If this tuple is in the chain of tuples created in updates
1587                          * by "recent" transactions then we have to move all chain of
1588                          * tuples to another places.
1589                          *
1590                          * NOTE: this test is not 100% accurate: it is possible for a
1591                          * tuple to be an updated one with recent xmin, and yet not
1592                          * have a corresponding tuple in the vtlinks list.      Presumably
1593                          * there was once a parent tuple with xmax matching the xmin,
1594                          * but it's possible that that tuple has been removed --- for
1595                          * example, if it had xmin = xmax then
1596                          * HeapTupleSatisfiesVacuum would deem it removable as soon as
1597                          * the xmin xact completes.
1598                          *
1599                          * To be on the safe side, we abandon the repair_frag process if
1600                          * we cannot find the parent tuple in vtlinks.  This may be
1601                          * overly conservative; AFAICS it would be safe to move the
1602                          * chain.
1603                          */
1604                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
1605                          !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1606                                                                         OldestXmin)) ||
1607                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
1608                                                                                            HEAP_MARKED_FOR_UPDATE)) &&
1609                                  !(ItemPointerEquals(&(tuple.t_self),
1610                                                                          &(tuple.t_data->t_ctid)))))
1611                         {
1612                                 Buffer          Cbuf = buf;
1613                                 bool            freeCbuf = false;
1614                                 bool            chain_move_failed = false;
1615                                 Page            Cpage;
1616                                 ItemId          Citemid;
1617                                 ItemPointerData Ctid;
1618                                 HeapTupleData tp = tuple;
1619                                 Size            tlen = tuple_len;
1620                                 VTupleMove      vtmove;
1621                                 int                     num_vtmove;
1622                                 int                     free_vtmove;
1623                                 VacPage         to_vacpage = NULL;
1624                                 int                     to_item = 0;
1625                                 int                     ti;
1626
1627                                 if (cur_buffer != InvalidBuffer)
1628                                 {
1629                                         WriteBuffer(cur_buffer);
1630                                         cur_buffer = InvalidBuffer;
1631                                 }
1632
1633                                 /* Quick exit if we have no vtlinks to search in */
1634                                 if (vacrelstats->vtlinks == NULL)
1635                                 {
1636                                         elog(WARNING, "Parent item in update-chain not found - can't continue repair_frag");
1637                                         break;          /* out of walk-along-page loop */
1638                                 }
1639
1640                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
1641                                 num_vtmove = 0;
1642                                 free_vtmove = 100;
1643
1644                                 /*
1645                                  * If this tuple is in the begin/middle of the chain then
1646                                  * we have to move to the end of chain.
1647                                  */
1648                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
1649                                                                                           HEAP_MARKED_FOR_UPDATE)) &&
1650                                            !(ItemPointerEquals(&(tp.t_self),
1651                                                                                    &(tp.t_data->t_ctid))))
1652                                 {
1653                                         Ctid = tp.t_data->t_ctid;
1654                                         if (freeCbuf)
1655                                                 ReleaseBuffer(Cbuf);
1656                                         freeCbuf = true;
1657                                         Cbuf = ReadBuffer(onerel,
1658                                                                           ItemPointerGetBlockNumber(&Ctid));
1659                                         Cpage = BufferGetPage(Cbuf);
1660                                         Citemid = PageGetItemId(Cpage,
1661                                                                           ItemPointerGetOffsetNumber(&Ctid));
1662                                         if (!ItemIdIsUsed(Citemid))
1663                                         {
1664                                                 /*
1665                                                  * This means that in the middle of chain there
1666                                                  * was tuple updated by older (than OldestXmin)
1667                                                  * xaction and this tuple is already deleted by
1668                                                  * me. Actually, upper part of chain should be
1669                                                  * removed and seems that this should be handled
1670                                                  * in scan_heap(), but it's not implemented at the
1671                                                  * moment and so we just stop shrinking here.
1672                                                  */
1673                                                 elog(WARNING, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1674                                                 chain_move_failed = true;
1675                                                 break;  /* out of loop to move to chain end */
1676                                         }
1677                                         tp.t_datamcxt = NULL;
1678                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1679                                         tp.t_self = Ctid;
1680                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1681                                 }
1682                                 if (chain_move_failed)
1683                                 {
1684                                         if (freeCbuf)
1685                                                 ReleaseBuffer(Cbuf);
1686                                         pfree(vtmove);
1687                                         break;          /* out of walk-along-page loop */
1688                                 }
1689
1690                                 /*
1691                                  * Check if all items in chain can be moved
1692                                  */
1693                                 for (;;)
1694                                 {
1695                                         Buffer          Pbuf;
1696                                         Page            Ppage;
1697                                         ItemId          Pitemid;
1698                                         HeapTupleData Ptp;
1699                                         VTupleLinkData vtld,
1700                                                            *vtlp;
1701
1702                                         if (to_vacpage == NULL ||
1703                                                 !enough_space(to_vacpage, tlen))
1704                                         {
1705                                                 for (i = 0; i < num_fraged_pages; i++)
1706                                                 {
1707                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1708                                                                 break;
1709                                                 }
1710
1711                                                 if (i == num_fraged_pages)
1712                                                 {
1713                                                         /* can't move item anywhere */
1714                                                         chain_move_failed = true;
1715                                                         break;          /* out of check-all-items loop */
1716                                                 }
1717                                                 to_item = i;
1718                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1719                                         }
1720                                         to_vacpage->free -= MAXALIGN(tlen);
1721                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1722                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1723                                         (to_vacpage->offsets_used)++;
1724                                         if (free_vtmove == 0)
1725                                         {
1726                                                 free_vtmove = 1000;
1727                                                 vtmove = (VTupleMove)
1728                                                         repalloc(vtmove,
1729                                                                          (free_vtmove + num_vtmove) *
1730                                                                          sizeof(VTupleMoveData));
1731                                         }
1732                                         vtmove[num_vtmove].tid = tp.t_self;
1733                                         vtmove[num_vtmove].vacpage = to_vacpage;
1734                                         if (to_vacpage->offsets_used == 1)
1735                                                 vtmove[num_vtmove].cleanVpd = true;
1736                                         else
1737                                                 vtmove[num_vtmove].cleanVpd = false;
1738                                         free_vtmove--;
1739                                         num_vtmove++;
1740
1741                                         /* At beginning of chain? */
1742                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1743                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
1744                                                                                           OldestXmin))
1745                                                 break;
1746
1747                                         /* No, move to tuple with prior row version */
1748                                         vtld.new_tid = tp.t_self;
1749                                         vtlp = (VTupleLink)
1750                                                 vac_bsearch((void *) &vtld,
1751                                                                         (void *) (vacrelstats->vtlinks),
1752                                                                         vacrelstats->num_vtlinks,
1753                                                                         sizeof(VTupleLinkData),
1754                                                                         vac_cmp_vtlinks);
1755                                         if (vtlp == NULL)
1756                                         {
1757                                                 /* see discussion above */
1758                                                 elog(WARNING, "Parent item in update-chain not found - can't continue repair_frag");
1759                                                 chain_move_failed = true;
1760                                                 break;  /* out of check-all-items loop */
1761                                         }
1762                                         tp.t_self = vtlp->this_tid;
1763                                         Pbuf = ReadBuffer(onerel,
1764                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1765                                         Ppage = BufferGetPage(Pbuf);
1766                                         Pitemid = PageGetItemId(Ppage,
1767                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1768                                         /* this can't happen since we saw tuple earlier: */
1769                                         if (!ItemIdIsUsed(Pitemid))
1770                                                 elog(ERROR, "Parent itemid marked as unused");
1771                                         Ptp.t_datamcxt = NULL;
1772                                         Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1773
1774                                         /* ctid should not have changed since we saved it */
1775                                         Assert(ItemPointerEquals(&(vtld.new_tid),
1776                                                                                          &(Ptp.t_data->t_ctid)));
1777
1778                                         /*
1779                                          * Read above about cases when !ItemIdIsUsed(Citemid)
1780                                          * (child item is removed)... Due to the fact that at
1781                                          * the moment we don't remove unuseful part of
1782                                          * update-chain, it's possible to get too old parent
1783                                          * row here. Like as in the case which caused this
1784                                          * problem, we stop shrinking here. I could try to
1785                                          * find real parent row but want not to do it because
1786                                          * of real solution will be implemented anyway, later,
1787                                          * and we are too close to 6.5 release. - vadim
1788                                          * 06/11/99
1789                                          */
1790                                         if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
1791                                                                          HeapTupleHeaderGetXmin(tp.t_data))))
1792                                         {
1793                                                 ReleaseBuffer(Pbuf);
1794                                                 elog(WARNING, "Too old parent tuple found - can't continue repair_frag");
1795                                                 chain_move_failed = true;
1796                                                 break;  /* out of check-all-items loop */
1797                                         }
1798                                         tp.t_datamcxt = Ptp.t_datamcxt;
1799                                         tp.t_data = Ptp.t_data;
1800                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
1801                                         if (freeCbuf)
1802                                                 ReleaseBuffer(Cbuf);
1803                                         Cbuf = Pbuf;
1804                                         freeCbuf = true;
1805                                 }                               /* end of check-all-items loop */
1806
1807                                 if (freeCbuf)
1808                                         ReleaseBuffer(Cbuf);
1809                                 freeCbuf = false;
1810
1811                                 if (chain_move_failed)
1812                                 {
1813                                         /*
1814                                          * Undo changes to offsets_used state.  We don't
1815                                          * bother cleaning up the amount-free state, since
1816                                          * we're not going to do any further tuple motion.
1817                                          */
1818                                         for (i = 0; i < num_vtmove; i++)
1819                                         {
1820                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1821                                                 (vtmove[i].vacpage->offsets_used)--;
1822                                         }
1823                                         pfree(vtmove);
1824                                         break;          /* out of walk-along-page loop */
1825                                 }
1826
1827                                 /*
1828                                  * Okay, move the whle tuple chain
1829                                  */
1830                                 ItemPointerSetInvalid(&Ctid);
1831                                 for (ti = 0; ti < num_vtmove; ti++)
1832                                 {
1833                                         VacPage         destvacpage = vtmove[ti].vacpage;
1834
1835                                         /* Get page to move from */
1836                                         tuple.t_self = vtmove[ti].tid;
1837                                         Cbuf = ReadBuffer(onerel,
1838                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1839
1840                                         /* Get page to move to */
1841                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1842
1843                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1844                                         if (cur_buffer != Cbuf)
1845                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1846
1847                                         ToPage = BufferGetPage(cur_buffer);
1848                                         Cpage = BufferGetPage(Cbuf);
1849
1850                                         Citemid = PageGetItemId(Cpage,
1851                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1852                                         tuple.t_datamcxt = NULL;
1853                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1854                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1855
1856                                         /*
1857                                          * make a copy of the source tuple, and then mark the
1858                                          * source tuple MOVED_OFF.
1859                                          */
1860                                         heap_copytuple_with_tuple(&tuple, &newtup);
1861
1862                                         /*
1863                                          * register invalidation of source tuple in catcaches.
1864                                          */
1865                                         CacheInvalidateHeapTuple(onerel, &tuple);
1866
1867                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1868                                         START_CRIT_SECTION();
1869
1870                                         tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
1871                                                                                                   HEAP_XMIN_INVALID |
1872                                                                                                   HEAP_MOVED_IN);
1873                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1874                                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
1875
1876                                         /*
1877                                          * If this page was not used before - clean it.
1878                                          *
1879                                          * NOTE: a nasty bug used to lurk here.  It is possible
1880                                          * for the source and destination pages to be the same
1881                                          * (since this tuple-chain member can be on a page
1882                                          * lower than the one we're currently processing in
1883                                          * the outer loop).  If that's true, then after
1884                                          * vacuum_page() the source tuple will have been
1885                                          * moved, and tuple.t_data will be pointing at
1886                                          * garbage.  Therefore we must do everything that uses
1887                                          * tuple.t_data BEFORE this step!!
1888                                          *
1889                                          * This path is different from the other callers of
1890                                          * vacuum_page, because we have already incremented
1891                                          * the vacpage's offsets_used field to account for the
1892                                          * tuple(s) we expect to move onto the page. Therefore
1893                                          * vacuum_page's check for offsets_used == 0 is wrong.
1894                                          * But since that's a good debugging check for all
1895                                          * other callers, we work around it here rather than
1896                                          * remove it.
1897                                          */
1898                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1899                                         {
1900                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1901
1902                                                 destvacpage->offsets_used = 0;
1903                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1904                                                 destvacpage->offsets_used = sv_offsets_used;
1905                                         }
1906
1907                                         /*
1908                                          * Update the state of the copied tuple, and store it
1909                                          * on the destination page.
1910                                          */
1911                                         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
1912                                                                                                    HEAP_XMIN_INVALID |
1913                                                                                                    HEAP_MOVED_OFF);
1914                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1915                                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
1916                                         newoff = PageAddItem(ToPage,
1917                                                                                  (Item) newtup.t_data,
1918                                                                                  tuple_len,
1919                                                                                  InvalidOffsetNumber,
1920                                                                                  LP_USED);
1921                                         if (newoff == InvalidOffsetNumber)
1922                                         {
1923                                                 elog(PANIC, "moving chain: failed to add item with len = %lu to page %u",
1924                                                   (unsigned long) tuple_len, destvacpage->blkno);
1925                                         }
1926                                         newitemid = PageGetItemId(ToPage, newoff);
1927                                         pfree(newtup.t_data);
1928                                         newtup.t_datamcxt = NULL;
1929                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1930                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1931
1932                                         /* XLOG stuff */
1933                                         if (!onerel->rd_istemp)
1934                                         {
1935                                                 XLogRecPtr      recptr =
1936                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1937                                                                           cur_buffer, &newtup);
1938
1939                                                 if (Cbuf != cur_buffer)
1940                                                 {
1941                                                         PageSetLSN(Cpage, recptr);
1942                                                         PageSetSUI(Cpage, ThisStartUpID);
1943                                                 }
1944                                                 PageSetLSN(ToPage, recptr);
1945                                                 PageSetSUI(ToPage, ThisStartUpID);
1946                                         }
1947                                         else
1948                                         {
1949                                                 /*
1950                                                  * No XLOG record, but still need to flag that XID
1951                                                  * exists on disk
1952                                                  */
1953                                                 MyXactMadeTempRelUpdate = true;
1954                                         }
1955
1956                                         END_CRIT_SECTION();
1957
1958                                         if (destvacpage->blkno > last_move_dest_block)
1959                                                 last_move_dest_block = destvacpage->blkno;
1960
1961                                         /*
1962                                          * Set new tuple's t_ctid pointing to itself for last
1963                                          * tuple in chain, and to next tuple in chain
1964                                          * otherwise.
1965                                          */
1966                                         if (!ItemPointerIsValid(&Ctid))
1967                                                 newtup.t_data->t_ctid = newtup.t_self;
1968                                         else
1969                                                 newtup.t_data->t_ctid = Ctid;
1970                                         Ctid = newtup.t_self;
1971
1972                                         num_moved++;
1973
1974                                         /*
1975                                          * Remember that we moved tuple from the current page
1976                                          * (corresponding index tuple will be cleaned).
1977                                          */
1978                                         if (Cbuf == buf)
1979                                                 vacpage->offsets[vacpage->offsets_free++] =
1980                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1981                                         else
1982                                                 keep_tuples++;
1983
1984                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1985                                         if (cur_buffer != Cbuf)
1986                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
1987
1988                                         /* Create index entries for the moved tuple */
1989                                         if (resultRelInfo->ri_NumIndices > 0)
1990                                         {
1991                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
1992                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
1993                                                                                           estate, true);
1994                                         }
1995
1996                                         WriteBuffer(cur_buffer);
1997                                         WriteBuffer(Cbuf);
1998                                 }                               /* end of move-the-tuple-chain loop */
1999
2000                                 cur_buffer = InvalidBuffer;
2001                                 pfree(vtmove);
2002                                 chain_tuple_moved = true;
2003
2004                                 /* advance to next tuple in walk-along-page loop */
2005                                 continue;
2006                         }                                       /* end of is-tuple-in-chain test */
2007
2008                         /* try to find new page for this tuple */
2009                         if (cur_buffer == InvalidBuffer ||
2010                                 !enough_space(cur_page, tuple_len))
2011                         {
2012                                 if (cur_buffer != InvalidBuffer)
2013                                 {
2014                                         WriteBuffer(cur_buffer);
2015                                         cur_buffer = InvalidBuffer;
2016                                 }
2017                                 for (i = 0; i < num_fraged_pages; i++)
2018                                 {
2019                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2020                                                 break;
2021                                 }
2022                                 if (i == num_fraged_pages)
2023                                         break;          /* can't move item anywhere */
2024                                 cur_item = i;
2025                                 cur_page = fraged_pages->pagedesc[cur_item];
2026                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
2027                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
2028                                 ToPage = BufferGetPage(cur_buffer);
2029                                 /* if this page was not used before - clean it */
2030                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
2031                                         vacuum_page(onerel, cur_buffer, cur_page);
2032                         }
2033                         else
2034                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
2035
2036                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2037
2038                         /* copy tuple */
2039                         heap_copytuple_with_tuple(&tuple, &newtup);
2040
2041                         /*
2042                          * register invalidation of source tuple in catcaches.
2043                          *
2044                          * (Note: we do not need to register the copied tuple, because we
2045                          * are not changing the tuple contents and so there cannot be
2046                          * any need to flush negative catcache entries.)
2047                          */
2048                         CacheInvalidateHeapTuple(onerel, &tuple);
2049
2050                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
2051                         START_CRIT_SECTION();
2052
2053                         /*
2054                          * Mark new tuple as MOVED_IN by me.
2055                          */
2056                         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2057                                                                                    HEAP_XMIN_INVALID |
2058                                                                                    HEAP_MOVED_OFF);
2059                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2060                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2061
2062                         /* add tuple to the page */
2063                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
2064                                                                  InvalidOffsetNumber, LP_USED);
2065                         if (newoff == InvalidOffsetNumber)
2066                         {
2067                                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2068                                          (unsigned long) tuple_len,
2069                                          cur_page->blkno, (unsigned long) cur_page->free,
2070                                          cur_page->offsets_used, cur_page->offsets_free);
2071                         }
2072                         newitemid = PageGetItemId(ToPage, newoff);
2073                         pfree(newtup.t_data);
2074                         newtup.t_datamcxt = NULL;
2075                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
2076                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
2077                         newtup.t_self = newtup.t_data->t_ctid;
2078
2079                         /*
2080                          * Mark old tuple as MOVED_OFF by me.
2081                          */
2082                         tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2083                                                                                   HEAP_XMIN_INVALID |
2084                                                                                   HEAP_MOVED_IN);
2085                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
2086                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
2087
2088                         /* XLOG stuff */
2089                         if (!onerel->rd_istemp)
2090                         {
2091                                 XLogRecPtr      recptr =
2092                                 log_heap_move(onerel, buf, tuple.t_self,
2093                                                           cur_buffer, &newtup);
2094
2095                                 PageSetLSN(page, recptr);
2096                                 PageSetSUI(page, ThisStartUpID);
2097                                 PageSetLSN(ToPage, recptr);
2098                                 PageSetSUI(ToPage, ThisStartUpID);
2099                         }
2100                         else
2101                         {
2102                                 /*
2103                                  * No XLOG record, but still need to flag that XID exists
2104                                  * on disk
2105                                  */
2106                                 MyXactMadeTempRelUpdate = true;
2107                         }
2108
2109                         END_CRIT_SECTION();
2110
2111                         cur_page->offsets_used++;
2112                         num_moved++;
2113                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
2114                         if (cur_page->blkno > last_move_dest_block)
2115                                 last_move_dest_block = cur_page->blkno;
2116
2117                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2118
2119                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2120                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2121
2122                         /* insert index' tuples if needed */
2123                         if (resultRelInfo->ri_NumIndices > 0)
2124                         {
2125                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2126                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
2127                         }
2128                 }                                               /* walk along page */
2129
2130                 /*
2131                  * If we broke out of the walk-along-page loop early (ie, still
2132                  * have offnum <= maxoff), then we failed to move some tuple off
2133                  * this page.  No point in shrinking any more, so clean up and
2134                  * exit the per-page loop.
2135                  */
2136                 if (offnum < maxoff && keep_tuples > 0)
2137                 {
2138                         OffsetNumber off;
2139
2140                         /*
2141                          * Fix vacpage state for any unvisited tuples remaining on
2142                          * page
2143                          */
2144                         for (off = OffsetNumberNext(offnum);
2145                                  off <= maxoff;
2146                                  off = OffsetNumberNext(off))
2147                         {
2148                                 itemid = PageGetItemId(page, off);
2149                                 if (!ItemIdIsUsed(itemid))
2150                                         continue;
2151                                 tuple.t_datamcxt = NULL;
2152                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2153                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2154                                         continue;
2155                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2156                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
2157                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2158                                 {
2159                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2160                                                 elog(ERROR, "Invalid XVAC in tuple header (4)");
2161                                         /* some chains was moved while */
2162                                         if (chain_tuple_moved)
2163                                         {                       /* cleaning this page */
2164                                                 Assert(vacpage->offsets_free > 0);
2165                                                 for (i = 0; i < vacpage->offsets_free; i++)
2166                                                 {
2167                                                         if (vacpage->offsets[i] == off)
2168                                                                 break;
2169                                                 }
2170                                                 if (i >= vacpage->offsets_free) /* not found */
2171                                                 {
2172                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2173                                                         Assert(keep_tuples > 0);
2174                                                         keep_tuples--;
2175                                                 }
2176                                         }
2177                                         else
2178                                         {
2179                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2180                                                 Assert(keep_tuples > 0);
2181                                                 keep_tuples--;
2182                                         }
2183                                 }
2184                                 else
2185                                         elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2186                         }
2187                 }
2188
2189                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2190                 {
2191                         if (chain_tuple_moved)          /* else - they are ordered */
2192                         {
2193                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2194                                           sizeof(OffsetNumber), vac_cmp_offno);
2195                         }
2196                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2197                         WriteBuffer(buf);
2198                 }
2199                 else if (dowrite)
2200                         WriteBuffer(buf);
2201                 else
2202                         ReleaseBuffer(buf);
2203
2204                 if (offnum <= maxoff)
2205                         break;                          /* had to quit early, see above note */
2206
2207         }                                                       /* walk along relation */
2208
2209         blkno++;                                        /* new number of blocks */
2210
2211         if (cur_buffer != InvalidBuffer)
2212         {
2213                 Assert(num_moved > 0);
2214                 WriteBuffer(cur_buffer);
2215         }
2216
2217         if (num_moved > 0)
2218         {
2219                 /*
2220                  * We have to commit our tuple movings before we truncate the
2221                  * relation.  Ideally we should do Commit/StartTransactionCommand
2222                  * here, relying on the session-level table lock to protect our
2223                  * exclusive access to the relation.  However, that would require
2224                  * a lot of extra code to close and re-open the relation, indexes,
2225                  * etc.  For now, a quick hack: record status of current
2226                  * transaction as committed, and continue.
2227                  */
2228                 RecordTransactionCommit();
2229         }
2230
2231         /*
2232          * We are not going to move any more tuples across pages, but we still
2233          * need to apply vacuum_page to compact free space in the remaining
2234          * pages in vacuum_pages list.  Note that some of these pages may also
2235          * be in the fraged_pages list, and may have had tuples moved onto
2236          * them; if so, we already did vacuum_page and needn't do it again.
2237          */
2238         for (i = 0, curpage = vacuum_pages->pagedesc;
2239                  i < vacuumed_pages;
2240                  i++, curpage++)
2241         {
2242                 CHECK_FOR_INTERRUPTS();
2243                 Assert((*curpage)->blkno < blkno);
2244                 if ((*curpage)->offsets_used == 0)
2245                 {
2246                         /* this page was not used as a move target, so must clean it */
2247                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2248                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2249                         page = BufferGetPage(buf);
2250                         if (!PageIsEmpty(page))
2251                                 vacuum_page(onerel, buf, *curpage);
2252                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2253                         WriteBuffer(buf);
2254                 }
2255         }
2256
2257         /*
2258          * Now scan all the pages that we moved tuples onto and update tuple
2259          * status bits.  This is not really necessary, but will save time for
2260          * future transactions examining these tuples.
2261          *
2262          * XXX WARNING that this code fails to clear HEAP_MOVED_OFF tuples from
2263          * pages that were move source pages but not move dest pages.  One
2264          * also wonders whether it wouldn't be better to skip this step and
2265          * let the tuple status updates happen someplace that's not holding an
2266          * exclusive lock on the relation.
2267          */
2268         checked_moved = 0;
2269         for (i = 0, curpage = fraged_pages->pagedesc;
2270                  i < num_fraged_pages;
2271                  i++, curpage++)
2272         {
2273                 CHECK_FOR_INTERRUPTS();
2274                 Assert((*curpage)->blkno < blkno);
2275                 if ((*curpage)->blkno > last_move_dest_block)
2276                         break;                          /* no need to scan any further */
2277                 if ((*curpage)->offsets_used == 0)
2278                         continue;                       /* this page was never used as a move dest */
2279                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2280                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2281                 page = BufferGetPage(buf);
2282                 num_tuples = 0;
2283                 max_offset = PageGetMaxOffsetNumber(page);
2284                 for (newoff = FirstOffsetNumber;
2285                          newoff <= max_offset;
2286                          newoff = OffsetNumberNext(newoff))
2287                 {
2288                         itemid = PageGetItemId(page, newoff);
2289                         if (!ItemIdIsUsed(itemid))
2290                                 continue;
2291                         tuple.t_datamcxt = NULL;
2292                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2293                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2294                         {
2295                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED))
2296                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2297                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2298                                         elog(ERROR, "Invalid XVAC in tuple header (2)");
2299                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2300                                 {
2301                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2302                                         tuple.t_data->t_infomask &= ~HEAP_MOVED;
2303                                         num_tuples++;
2304                                 }
2305                                 else
2306                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2307                         }
2308                 }
2309                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2310                 WriteBuffer(buf);
2311                 Assert((*curpage)->offsets_used == num_tuples);
2312                 checked_moved += num_tuples;
2313         }
2314         Assert(num_moved == checked_moved);
2315
2316         elog(elevel, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
2317                  RelationGetRelationName(onerel),
2318                  nblocks, blkno, num_moved,
2319                  vac_show_rusage(&ru0));
2320
2321         /*
2322          * Reflect the motion of system tuples to catalog cache here.
2323          */
2324         CommandCounterIncrement();
2325
2326         if (Nvacpagelist.num_pages > 0)
2327         {
2328                 /* vacuum indexes again if needed */
2329                 if (Irel != (Relation *) NULL)
2330                 {
2331                         VacPage    *vpleft,
2332                                            *vpright,
2333                                                 vpsave;
2334
2335                         /* re-sort Nvacpagelist.pagedesc */
2336                         for (vpleft = Nvacpagelist.pagedesc,
2337                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2338                                  vpleft < vpright; vpleft++, vpright--)
2339                         {
2340                                 vpsave = *vpleft;
2341                                 *vpleft = *vpright;
2342                                 *vpright = vpsave;
2343                         }
2344                         Assert(keep_tuples >= 0);
2345                         for (i = 0; i < nindexes; i++)
2346                                 vacuum_index(&Nvacpagelist, Irel[i],
2347                                                          vacrelstats->rel_tuples, keep_tuples);
2348                 }
2349
2350                 /* clean moved tuples from last page in Nvacpagelist list */
2351                 if (vacpage->blkno == (blkno - 1) &&
2352                         vacpage->offsets_free > 0)
2353                 {
2354                         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2355                         OffsetNumber *unused = unbuf;
2356                         int                     uncnt;
2357
2358                         buf = ReadBuffer(onerel, vacpage->blkno);
2359                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2360                         page = BufferGetPage(buf);
2361                         num_tuples = 0;
2362                         maxoff = PageGetMaxOffsetNumber(page);
2363                         for (offnum = FirstOffsetNumber;
2364                                  offnum <= maxoff;
2365                                  offnum = OffsetNumberNext(offnum))
2366                         {
2367                                 itemid = PageGetItemId(page, offnum);
2368                                 if (!ItemIdIsUsed(itemid))
2369                                         continue;
2370                                 tuple.t_datamcxt = NULL;
2371                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2372
2373                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2374                                 {
2375                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2376                                         {
2377                                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2378                                                         elog(ERROR, "Invalid XVAC in tuple header (3)");
2379                                                 itemid->lp_flags &= ~LP_USED;
2380                                                 num_tuples++;
2381                                         }
2382                                         else
2383                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (3)");
2384                                 }
2385
2386                         }
2387                         Assert(vacpage->offsets_free == num_tuples);
2388
2389                         START_CRIT_SECTION();
2390
2391                         uncnt = PageRepairFragmentation(page, unused);
2392
2393                         /* XLOG stuff */
2394                         if (!onerel->rd_istemp)
2395                         {
2396                                 XLogRecPtr      recptr;
2397
2398                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
2399                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2400                                 PageSetLSN(page, recptr);
2401                                 PageSetSUI(page, ThisStartUpID);
2402                         }
2403                         else
2404                         {
2405                                 /*
2406                                  * No XLOG record, but still need to flag that XID exists
2407                                  * on disk
2408                                  */
2409                                 MyXactMadeTempRelUpdate = true;
2410                         }
2411
2412                         END_CRIT_SECTION();
2413
2414                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2415                         WriteBuffer(buf);
2416                 }
2417
2418                 /* now - free new list of reaped pages */
2419                 curpage = Nvacpagelist.pagedesc;
2420                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2421                         pfree(*curpage);
2422                 pfree(Nvacpagelist.pagedesc);
2423         }
2424
2425         /*
2426          * Flush dirty pages out to disk.  We do this unconditionally, even if
2427          * we don't need to truncate, because we want to ensure that all
2428          * tuples have correct on-row commit status on disk (see bufmgr.c's
2429          * comments for FlushRelationBuffers()).
2430          */
2431         i = FlushRelationBuffers(onerel, blkno);
2432         if (i < 0)
2433                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2434                          i);
2435
2436         /* truncate relation, if needed */
2437         if (blkno < nblocks)
2438         {
2439                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2440                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2441                 onerel->rd_targblock = InvalidBlockNumber;
2442                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2443         }
2444
2445         /* clean up */
2446         pfree(vacpage);
2447         if (vacrelstats->vtlinks != NULL)
2448                 pfree(vacrelstats->vtlinks);
2449
2450         ExecDropTupleTable(tupleTable, true);
2451
2452         ExecCloseIndices(resultRelInfo);
2453 }
2454
2455 /*
2456  *      vacuum_heap() -- free dead tuples
2457  *
2458  *              This routine marks dead tuples as unused and truncates relation
2459  *              if there are "empty" end-blocks.
2460  */
2461 static void
2462 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2463 {
2464         Buffer          buf;
2465         VacPage    *vacpage;
2466         BlockNumber relblocks;
2467         int                     nblocks;
2468         int                     i;
2469
2470         nblocks = vacuum_pages->num_pages;
2471         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2472
2473         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2474         {
2475                 CHECK_FOR_INTERRUPTS();
2476                 if ((*vacpage)->offsets_free > 0)
2477                 {
2478                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2479                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2480                         vacuum_page(onerel, buf, *vacpage);
2481                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2482                         WriteBuffer(buf);
2483                 }
2484         }
2485
2486         /*
2487          * Flush dirty pages out to disk.  We do this unconditionally, even if
2488          * we don't need to truncate, because we want to ensure that all
2489          * tuples have correct on-row commit status on disk (see bufmgr.c's
2490          * comments for FlushRelationBuffers()).
2491          */
2492         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2493         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2494
2495         i = FlushRelationBuffers(onerel, relblocks);
2496         if (i < 0)
2497                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2498                          i);
2499
2500         /* truncate relation if there are some empty end-pages */
2501         if (vacuum_pages->empty_end_pages > 0)
2502         {
2503                 elog(elevel, "Rel %s: Pages: %u --> %u.",
2504                          RelationGetRelationName(onerel),
2505                          vacrelstats->rel_pages, relblocks);
2506                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2507                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2508                 onerel->rd_targblock = InvalidBlockNumber;
2509                 vacrelstats->rel_pages = relblocks;             /* set new number of
2510                                                                                                  * blocks */
2511         }
2512 }
2513
2514 /*
2515  *      vacuum_page() -- free dead tuples on a page
2516  *                                       and repair its fragmentation.
2517  */
2518 static void
2519 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2520 {
2521         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2522         OffsetNumber *unused = unbuf;
2523         int                     uncnt;
2524         Page            page = BufferGetPage(buffer);
2525         ItemId          itemid;
2526         int                     i;
2527
2528         /* There shouldn't be any tuples moved onto the page yet! */
2529         Assert(vacpage->offsets_used == 0);
2530
2531         START_CRIT_SECTION();
2532
2533         for (i = 0; i < vacpage->offsets_free; i++)
2534         {
2535                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2536                 itemid->lp_flags &= ~LP_USED;
2537         }
2538
2539         uncnt = PageRepairFragmentation(page, unused);
2540
2541         /* XLOG stuff */
2542         if (!onerel->rd_istemp)
2543         {
2544                 XLogRecPtr      recptr;
2545
2546                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2547                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2548                 PageSetLSN(page, recptr);
2549                 PageSetSUI(page, ThisStartUpID);
2550         }
2551         else
2552         {
2553                 /* No XLOG record, but still need to flag that XID exists on disk */
2554                 MyXactMadeTempRelUpdate = true;
2555         }
2556
2557         END_CRIT_SECTION();
2558 }
2559
2560 /*
2561  *      scan_index() -- scan one index relation to update statistic.
2562  *
2563  * We use this when we have no deletions to do.
2564  */
2565 static void
2566 scan_index(Relation indrel, double num_tuples)
2567 {
2568         IndexBulkDeleteResult *stats;
2569         VacRUsage       ru0;
2570
2571         vac_init_rusage(&ru0);
2572
2573         /*
2574          * Even though we're not planning to delete anything, use the
2575          * ambulkdelete call, so that the scan happens within the index AM for
2576          * more speed.
2577          */
2578         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2579
2580         if (!stats)
2581                 return;
2582
2583         /* now update statistics in pg_class */
2584         vac_update_relstats(RelationGetRelid(indrel),
2585                                                 stats->num_pages, stats->num_index_tuples,
2586                                                 false);
2587
2588         elog(elevel, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
2589                  RelationGetRelationName(indrel),
2590                  stats->num_pages, stats->num_index_tuples,
2591                  vac_show_rusage(&ru0));
2592
2593         /*
2594          * Check for tuple count mismatch.      If the index is partial, then it's
2595          * OK for it to have fewer tuples than the heap; else we got trouble.
2596          */
2597         if (stats->num_index_tuples != num_tuples)
2598         {
2599                 if (stats->num_index_tuples > num_tuples ||
2600                         !vac_is_partial_index(indrel))
2601                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f)."
2602                                  "\n\tRecreate the index.",
2603                                  RelationGetRelationName(indrel),
2604                                  stats->num_index_tuples, num_tuples);
2605         }
2606
2607         pfree(stats);
2608 }
2609
2610 /*
2611  *      vacuum_index() -- vacuum one index relation.
2612  *
2613  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2614  *              It's locked. Indrel is an index relation on the vacuumed heap.
2615  *
2616  *              We don't bother to set locks on the index relation here, since
2617  *              the parent table is exclusive-locked already.
2618  *
2619  *              Finally, we arrange to update the index relation's statistics in
2620  *              pg_class.
2621  */
2622 static void
2623 vacuum_index(VacPageList vacpagelist, Relation indrel,
2624                          double num_tuples, int keep_tuples)
2625 {
2626         IndexBulkDeleteResult *stats;
2627         VacRUsage       ru0;
2628
2629         vac_init_rusage(&ru0);
2630
2631         /* Do bulk deletion */
2632         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2633
2634         if (!stats)
2635                 return;
2636
2637         /* now update statistics in pg_class */
2638         vac_update_relstats(RelationGetRelid(indrel),
2639                                                 stats->num_pages, stats->num_index_tuples,
2640                                                 false);
2641
2642         elog(elevel, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
2643                  RelationGetRelationName(indrel), stats->num_pages,
2644                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2645                  vac_show_rusage(&ru0));
2646
2647         /*
2648          * Check for tuple count mismatch.      If the index is partial, then it's
2649          * OK for it to have fewer tuples than the heap; else we got trouble.
2650          */
2651         if (stats->num_index_tuples != num_tuples + keep_tuples)
2652         {
2653                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2654                         !vac_is_partial_index(indrel))
2655                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f)."
2656                                  "\n\tRecreate the index.",
2657                                  RelationGetRelationName(indrel),
2658                                  stats->num_index_tuples, num_tuples);
2659         }
2660
2661         pfree(stats);
2662 }
2663
2664 /*
2665  *      tid_reaped() -- is a particular tid reaped?
2666  *
2667  *              This has the right signature to be an IndexBulkDeleteCallback.
2668  *
2669  *              vacpagelist->VacPage_array is sorted in right order.
2670  */
2671 static bool
2672 tid_reaped(ItemPointer itemptr, void *state)
2673 {
2674         VacPageList vacpagelist = (VacPageList) state;
2675         OffsetNumber ioffno;
2676         OffsetNumber *voff;
2677         VacPage         vp,
2678                            *vpp;
2679         VacPageData vacpage;
2680
2681         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2682         ioffno = ItemPointerGetOffsetNumber(itemptr);
2683
2684         vp = &vacpage;
2685         vpp = (VacPage *) vac_bsearch((void *) &vp,
2686                                                                   (void *) (vacpagelist->pagedesc),
2687                                                                   vacpagelist->num_pages,
2688                                                                   sizeof(VacPage),
2689                                                                   vac_cmp_blk);
2690
2691         if (vpp == NULL)
2692                 return false;
2693
2694         /* ok - we are on a partially or fully reaped page */
2695         vp = *vpp;
2696
2697         if (vp->offsets_free == 0)
2698         {
2699                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2700                 return true;
2701         }
2702
2703         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2704                                                                                 (void *) (vp->offsets),
2705                                                                                 vp->offsets_free,
2706                                                                                 sizeof(OffsetNumber),
2707                                                                                 vac_cmp_offno);
2708
2709         if (voff == NULL)
2710                 return false;
2711
2712         /* tid is reaped */
2713         return true;
2714 }
2715
2716 /*
2717  * Dummy version for scan_index.
2718  */
2719 static bool
2720 dummy_tid_reaped(ItemPointer itemptr, void *state)
2721 {
2722         return false;
2723 }
2724
2725 /*
2726  * Update the shared Free Space Map with the info we now have about
2727  * free space in the relation, discarding any old info the map may have.
2728  */
2729 static void
2730 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2731                            BlockNumber rel_pages)
2732 {
2733         int                     nPages = fraged_pages->num_pages;
2734         int                     i;
2735         PageFreeSpaceInfo *pageSpaces;
2736
2737         /* +1 to avoid palloc(0) */
2738         pageSpaces = (PageFreeSpaceInfo *)
2739                 palloc((nPages + 1) * sizeof(PageFreeSpaceInfo));
2740
2741         for (i = 0; i < nPages; i++)
2742         {
2743                 pageSpaces[i].blkno = fraged_pages->pagedesc[i]->blkno;
2744                 pageSpaces[i].avail = fraged_pages->pagedesc[i]->free;
2745
2746                 /*
2747                  * fraged_pages may contain entries for pages that we later
2748                  * decided to truncate from the relation; don't enter them into
2749                  * the free space map!
2750                  */
2751                 if (pageSpaces[i].blkno >= rel_pages)
2752                 {
2753                         nPages = i;
2754                         break;
2755                 }
2756         }
2757
2758         MultiRecordFreeSpace(&onerel->rd_node, 0, nPages, pageSpaces);
2759
2760         pfree(pageSpaces);
2761 }
2762
2763 /* Copy a VacPage structure */
2764 static VacPage
2765 copy_vac_page(VacPage vacpage)
2766 {
2767         VacPage         newvacpage;
2768
2769         /* allocate a VacPageData entry */
2770         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2771                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2772
2773         /* fill it in */
2774         if (vacpage->offsets_free > 0)
2775                 memcpy(newvacpage->offsets, vacpage->offsets,
2776                            vacpage->offsets_free * sizeof(OffsetNumber));
2777         newvacpage->blkno = vacpage->blkno;
2778         newvacpage->free = vacpage->free;
2779         newvacpage->offsets_used = vacpage->offsets_used;
2780         newvacpage->offsets_free = vacpage->offsets_free;
2781
2782         return newvacpage;
2783 }
2784
2785 /*
2786  * Add a VacPage pointer to a VacPageList.
2787  *
2788  *              As a side effect of the way that scan_heap works,
2789  *              higher pages come after lower pages in the array
2790  *              (and highest tid on a page is last).
2791  */
2792 static void
2793 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2794 {
2795 #define PG_NPAGEDESC 1024
2796
2797         /* allocate a VacPage entry if needed */
2798         if (vacpagelist->num_pages == 0)
2799         {
2800                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2801                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2802         }
2803         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2804         {
2805                 vacpagelist->num_allocated_pages *= 2;
2806                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2807         }
2808         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2809         (vacpagelist->num_pages)++;
2810 }
2811
2812 /*
2813  * vac_bsearch: just like standard C library routine bsearch(),
2814  * except that we first test to see whether the target key is outside
2815  * the range of the table entries.      This case is handled relatively slowly
2816  * by the normal binary search algorithm (ie, no faster than any other key)
2817  * but it occurs often enough in VACUUM to be worth optimizing.
2818  */
2819 static void *
2820 vac_bsearch(const void *key, const void *base,
2821                         size_t nelem, size_t size,
2822                         int (*compar) (const void *, const void *))
2823 {
2824         int                     res;
2825         const void *last;
2826
2827         if (nelem == 0)
2828                 return NULL;
2829         res = compar(key, base);
2830         if (res < 0)
2831                 return NULL;
2832         if (res == 0)
2833                 return (void *) base;
2834         if (nelem > 1)
2835         {
2836                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2837                 res = compar(key, last);
2838                 if (res > 0)
2839                         return NULL;
2840                 if (res == 0)
2841                         return (void *) last;
2842         }
2843         if (nelem <= 2)
2844                 return NULL;                    /* already checked 'em all */
2845         return bsearch(key, base, nelem, size, compar);
2846 }
2847
2848 /*
2849  * Comparator routines for use with qsort() and bsearch().
2850  */
2851 static int
2852 vac_cmp_blk(const void *left, const void *right)
2853 {
2854         BlockNumber lblk,
2855                                 rblk;
2856
2857         lblk = (*((VacPage *) left))->blkno;
2858         rblk = (*((VacPage *) right))->blkno;
2859
2860         if (lblk < rblk)
2861                 return -1;
2862         if (lblk == rblk)
2863                 return 0;
2864         return 1;
2865 }
2866
2867 static int
2868 vac_cmp_offno(const void *left, const void *right)
2869 {
2870         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2871                 return -1;
2872         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2873                 return 0;
2874         return 1;
2875 }
2876
2877 static int
2878 vac_cmp_vtlinks(const void *left, const void *right)
2879 {
2880         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2881                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2882                 return -1;
2883         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2884                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2885                 return 1;
2886         /* bi_hi-es are equal */
2887         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2888                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2889                 return -1;
2890         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2891                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2892                 return 1;
2893         /* bi_lo-es are equal */
2894         if (((VTupleLink) left)->new_tid.ip_posid <
2895                 ((VTupleLink) right)->new_tid.ip_posid)
2896                 return -1;
2897         if (((VTupleLink) left)->new_tid.ip_posid >
2898                 ((VTupleLink) right)->new_tid.ip_posid)
2899                 return 1;
2900         return 0;
2901 }
2902
2903
2904 void
2905 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2906 {
2907         List       *indexoidlist,
2908                            *indexoidscan;
2909         int                     i;
2910
2911         indexoidlist = RelationGetIndexList(relation);
2912
2913         *nindexes = length(indexoidlist);
2914
2915         if (*nindexes > 0)
2916                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2917         else
2918                 *Irel = NULL;
2919
2920         i = 0;
2921         foreach(indexoidscan, indexoidlist)
2922         {
2923                 Oid                     indexoid = lfirsti(indexoidscan);
2924
2925                 (*Irel)[i] = index_open(indexoid);
2926                 i++;
2927         }
2928
2929         freeList(indexoidlist);
2930 }
2931
2932
2933 void
2934 vac_close_indexes(int nindexes, Relation *Irel)
2935 {
2936         if (Irel == (Relation *) NULL)
2937                 return;
2938
2939         while (nindexes--)
2940                 index_close(Irel[nindexes]);
2941         pfree(Irel);
2942 }
2943
2944
2945 /*
2946  * Is an index partial (ie, could it contain fewer tuples than the heap?)
2947  */
2948 bool
2949 vac_is_partial_index(Relation indrel)
2950 {
2951         /*
2952          * If the index's AM doesn't support nulls, it's partial for our
2953          * purposes
2954          */
2955         if (!indrel->rd_am->amindexnulls)
2956                 return true;
2957
2958         /* Otherwise, look to see if there's a partial-index predicate */
2959         return (VARSIZE(&indrel->rd_index->indpred) > VARHDRSZ);
2960 }
2961
2962
2963 static bool
2964 enough_space(VacPage vacpage, Size len)
2965 {
2966         len = MAXALIGN(len);
2967
2968         if (len > vacpage->free)
2969                 return false;
2970
2971         /* if there are free itemid(s) and len <= free_space... */
2972         if (vacpage->offsets_used < vacpage->offsets_free)
2973                 return true;
2974
2975         /* noff_used >= noff_free and so we'll have to allocate new itemid */
2976         if (len + sizeof(ItemIdData) <= vacpage->free)
2977                 return true;
2978
2979         return false;
2980 }
2981
2982
2983 /*
2984  * Initialize usage snapshot.
2985  */
2986 void
2987 vac_init_rusage(VacRUsage *ru0)
2988 {
2989         struct timezone tz;
2990
2991         getrusage(RUSAGE_SELF, &ru0->ru);
2992         gettimeofday(&ru0->tv, &tz);
2993 }
2994
2995 /*
2996  * Compute elapsed time since ru0 usage snapshot, and format into
2997  * a displayable string.  Result is in a static string, which is
2998  * tacky, but no one ever claimed that the Postgres backend is
2999  * threadable...
3000  */
3001 const char *
3002 vac_show_rusage(VacRUsage *ru0)
3003 {
3004         static char result[100];
3005         VacRUsage       ru1;
3006
3007         vac_init_rusage(&ru1);
3008
3009         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
3010         {
3011                 ru1.tv.tv_sec--;
3012                 ru1.tv.tv_usec += 1000000;
3013         }
3014         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
3015         {
3016                 ru1.ru.ru_stime.tv_sec--;
3017                 ru1.ru.ru_stime.tv_usec += 1000000;
3018         }
3019         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
3020         {
3021                 ru1.ru.ru_utime.tv_sec--;
3022                 ru1.ru.ru_utime.tv_usec += 1000000;
3023         }
3024
3025         snprintf(result, sizeof(result),
3026                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
3027                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
3028           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
3029                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
3030           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
3031                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
3032                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
3033
3034         return result;
3035 }