]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Disable VACUUM from being called from a function because function memory
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.224 2002/04/15 23:39:42 momjian Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_index.h"
33 #include "commands/vacuum.h"
34 #include "executor/executor.h"
35 #include "miscadmin.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "storage/smgr.h"
39 #include "tcop/pquery.h"
40 #include "utils/acl.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/inval.h"
44 #include "utils/lsyscache.h"
45 #include "utils/relcache.h"
46 #include "utils/syscache.h"
47 #include "pgstat.h"
48
49
50 typedef struct VacPageData
51 {
52         BlockNumber blkno;                      /* BlockNumber of this Page */
53         Size            free;                   /* FreeSpace on this Page */
54         uint16          offsets_used;   /* Number of OffNums used by vacuum */
55         uint16          offsets_free;   /* Number of OffNums free or to be free */
56         OffsetNumber offsets[1];        /* Array of free OffNums */
57 } VacPageData;
58
59 typedef VacPageData *VacPage;
60
61 typedef struct VacPageListData
62 {
63         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
64         int                     num_pages;              /* Number of pages in pagedesc */
65         int                     num_allocated_pages;    /* Number of allocated pages in
66                                                                                  * pagedesc */
67         VacPage    *pagedesc;           /* Descriptions of pages */
68 } VacPageListData;
69
70 typedef VacPageListData *VacPageList;
71
72 typedef struct VTupleLinkData
73 {
74         ItemPointerData new_tid;
75         ItemPointerData this_tid;
76 } VTupleLinkData;
77
78 typedef VTupleLinkData *VTupleLink;
79
80 typedef struct VTupleMoveData
81 {
82         ItemPointerData tid;            /* tuple ID */
83         VacPage         vacpage;                /* where to move */
84         bool            cleanVpd;               /* clean vacpage before using */
85 } VTupleMoveData;
86
87 typedef VTupleMoveData *VTupleMove;
88
89 typedef struct VRelStats
90 {
91         BlockNumber rel_pages;
92         double          rel_tuples;
93         Size            min_tlen;
94         Size            max_tlen;
95         bool            hasindex;
96         int                     num_vtlinks;
97         VTupleLink      vtlinks;
98 } VRelStats;
99
100
101 static MemoryContext vac_context = NULL;
102
103 static int elevel = -1;
104
105 static TransactionId OldestXmin;
106 static TransactionId FreezeLimit;
107
108 static TransactionId initialOldestXmin;
109 static TransactionId initialFreezeLimit;
110
111
112 /* non-export function prototypes */
113 static void vacuum_init(VacuumStmt *vacstmt);
114 static void vacuum_shutdown(VacuumStmt *vacstmt);
115 static List *getrels(const RangeVar *vacrel, const char *stmttype);
116 static void vac_update_dbstats(Oid dbid,
117                                    TransactionId vacuumXID,
118                                    TransactionId frozenXID);
119 static void vac_truncate_clog(TransactionId vacuumXID,
120                                   TransactionId frozenXID);
121 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
122 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
123 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
124                   VacPageList vacuum_pages, VacPageList fraged_pages);
125 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
126                         VacPageList vacuum_pages, VacPageList fraged_pages,
127                         int nindexes, Relation *Irel);
128 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
129                         VacPageList vacpagelist);
130 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
131 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
132                          double num_tuples, int keep_tuples);
133 static void scan_index(Relation indrel, double num_tuples);
134 static bool tid_reaped(ItemPointer itemptr, void *state);
135 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
136 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
137                            BlockNumber rel_pages);
138 static VacPage copy_vac_page(VacPage vacpage);
139 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
140 static void *vac_bsearch(const void *key, const void *base,
141                         size_t nelem, size_t size,
142                         int (*compar) (const void *, const void *));
143 static int      vac_cmp_blk(const void *left, const void *right);
144 static int      vac_cmp_offno(const void *left, const void *right);
145 static int      vac_cmp_vtlinks(const void *left, const void *right);
146 static bool enough_space(VacPage vacpage, Size len);
147
148
149 /****************************************************************************
150  *                                                                                                                                                      *
151  *                      Code common to all flavors of VACUUM and ANALYZE                                *
152  *                                                                                                                                                      *
153  ****************************************************************************
154  */
155
156
157 /*
158  * Primary entry point for VACUUM and ANALYZE commands.
159  */
160 void
161 vacuum(VacuumStmt *vacstmt)
162 {
163         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
164         List       *vrl,
165                            *cur;
166
167         if (vacstmt->verbose)
168                 elevel = INFO;
169         else
170                 elevel = DEBUG1;
171
172         /*
173          * We cannot run VACUUM inside a user transaction block; if we were
174          * inside a transaction, then our commit- and
175          * start-transaction-command calls would not have the intended effect!
176          * Furthermore, the forced commit that occurs before truncating the
177          * relation's file would have the effect of committing the rest of the
178          * user's transaction too, which would certainly not be the desired
179          * behavior.
180          */
181         if (IsTransactionBlock())
182                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
183
184         /* Running VACUUM from a function would free the function context */
185         if (!MemoryContextContains(QueryContext, vacstmt))
186                 elog(ERROR, "%s cannot be executed from a function", stmttype);
187                         
188         /*
189          * Send info about dead objects to the statistics collector
190          */
191         pgstat_vacuum_tabstat();
192
193         /*
194          * Create special memory context for cross-transaction storage.
195          *
196          * Since it is a child of QueryContext, it will go away eventually even
197          * if we suffer an error; there's no need for special abort cleanup
198          * logic.
199          */
200         vac_context = AllocSetContextCreate(QueryContext,
201                                                                                 "Vacuum",
202                                                                                 ALLOCSET_DEFAULT_MINSIZE,
203                                                                                 ALLOCSET_DEFAULT_INITSIZE,
204                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
205
206         /* Build list of relations to process (note this lives in vac_context) */
207         vrl = getrels(vacstmt->relation, stmttype);
208
209         /*
210          * Start up the vacuum cleaner.
211          */
212         vacuum_init(vacstmt);
213
214         /*
215          * Process each selected relation.      We are careful to process each
216          * relation in a separate transaction in order to avoid holding too
217          * many locks at one time.      Also, if we are doing VACUUM ANALYZE, the
218          * ANALYZE part runs as a separate transaction from the VACUUM to
219          * further reduce locking.
220          */
221         foreach(cur, vrl)
222         {
223                 Oid             relid = (Oid) lfirsti(cur);
224
225                 if (vacstmt->vacuum)
226                         vacuum_rel(relid, vacstmt, RELKIND_RELATION);
227                 if (vacstmt->analyze)
228                         analyze_rel(relid, vacstmt);
229         }
230
231         /* clean up */
232         vacuum_shutdown(vacstmt);
233 }
234
235 /*
236  *      vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
237  *
238  *              Formerly, there was code here to prevent more than one VACUUM from
239  *              executing concurrently in the same database.  However, there's no
240  *              good reason to prevent that, and manually removing lockfiles after
241  *              a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
242  *              and just rely on the locks we grab on each target table
243  *              to ensure that there aren't two VACUUMs running on the same table
244  *              at the same time.
245  *
246  *              The strangeness with committing and starting transactions in the
247  *              init and shutdown routines is due to the fact that the vacuum cleaner
248  *              is invoked via an SQL command, and so is already executing inside
249  *              a transaction.  We need to leave ourselves in a predictable state
250  *              on entry and exit to the vacuum cleaner.  We commit the transaction
251  *              started in PostgresMain() inside vacuum_init(), and start one in
252  *              vacuum_shutdown() to match the commit waiting for us back in
253  *              PostgresMain().
254  */
255 static void
256 vacuum_init(VacuumStmt *vacstmt)
257 {
258         if (vacstmt->vacuum && vacstmt->relation == NULL)
259         {
260                 /*
261                  * Compute the initially applicable OldestXmin and FreezeLimit
262                  * XIDs, so that we can record these values at the end of the
263                  * VACUUM. Note that individual tables may well be processed with
264                  * newer values, but we can guarantee that no (non-shared)
265                  * relations are processed with older ones.
266                  *
267                  * It is okay to record non-shared values in pg_database, even though
268                  * we may vacuum shared relations with older cutoffs, because only
269                  * the minimum of the values present in pg_database matters.  We
270                  * can be sure that shared relations have at some time been
271                  * vacuumed with cutoffs no worse than the global minimum; for, if
272                  * there is a backend in some other DB with xmin = OLDXMIN that's
273                  * determining the cutoff with which we vacuum shared relations,
274                  * it is not possible for that database to have a cutoff newer
275                  * than OLDXMIN recorded in pg_database.
276                  */
277                 vacuum_set_xid_limits(vacstmt, false,
278                                                           &initialOldestXmin, &initialFreezeLimit);
279         }
280
281         /* matches the StartTransaction in PostgresMain() */
282         CommitTransactionCommand();
283 }
284
285 static void
286 vacuum_shutdown(VacuumStmt *vacstmt)
287 {
288         /* on entry, we are not in a transaction */
289
290         /* matches the CommitTransaction in PostgresMain() */
291         StartTransactionCommand();
292
293         /*
294          * If we did a database-wide VACUUM, update the database's pg_database
295          * row with info about the transaction IDs used, and try to truncate
296          * pg_clog.
297          */
298         if (vacstmt->vacuum && vacstmt->relation == NULL)
299         {
300                 vac_update_dbstats(MyDatabaseId,
301                                                    initialOldestXmin, initialFreezeLimit);
302                 vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
303         }
304
305         /*
306          * Clean up working storage --- note we must do this after
307          * StartTransactionCommand, else we might be trying to delete the
308          * active context!
309          */
310         MemoryContextDelete(vac_context);
311         vac_context = NULL;
312 }
313
314 /*
315  * Build a list of Oids for each relation to be processed
316  *
317  * The list is built in vac_context so that it will survive across our
318  * per-relation transactions.
319  */
320 static List *
321 getrels(const RangeVar *vacrel, const char *stmttype)
322 {
323         List       *vrl = NIL;
324         MemoryContext oldcontext;
325
326         if (vacrel)
327         {
328                 /* Process specific relation */
329                 Oid             relid;
330
331                 relid = RangeVarGetRelid(vacrel, false);
332
333                 /* Make a relation list entry for this guy */
334                 oldcontext = MemoryContextSwitchTo(vac_context);
335                 vrl = lappendi(vrl, relid);
336                 MemoryContextSwitchTo(oldcontext);
337         }
338         else
339         {
340                 /* Process all plain relations listed in pg_class */
341                 Relation        pgclass;
342                 HeapScanDesc scan;
343                 HeapTuple       tuple;
344                 ScanKeyData key;
345
346                 ScanKeyEntryInitialize(&key, 0x0,
347                                                            Anum_pg_class_relkind,
348                                                            F_CHAREQ,
349                                                            CharGetDatum(RELKIND_RELATION));
350
351                 pgclass = heap_openr(RelationRelationName, AccessShareLock);
352
353                 scan = heap_beginscan(pgclass, false, SnapshotNow, 1, &key);
354
355                 while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
356                 {
357                         /* Make a relation list entry for this guy */
358                         oldcontext = MemoryContextSwitchTo(vac_context);
359                         vrl = lappendi(vrl, tuple->t_data->t_oid);
360                         MemoryContextSwitchTo(oldcontext);
361                 }
362
363                 heap_endscan(scan);
364                 heap_close(pgclass, AccessShareLock);
365         }
366
367         return vrl;
368 }
369
370 /*
371  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
372  */
373 void
374 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
375                                           TransactionId *oldestXmin,
376                                           TransactionId *freezeLimit)
377 {
378         TransactionId limit;
379
380         *oldestXmin = GetOldestXmin(sharedRel);
381
382         Assert(TransactionIdIsNormal(*oldestXmin));
383
384         if (vacstmt->freeze)
385         {
386                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
387                 limit = *oldestXmin;
388         }
389         else
390         {
391                 /*
392                  * Normal case: freeze cutoff is well in the past, to wit, about
393                  * halfway to the wrap horizon
394                  */
395                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
396         }
397
398         /*
399          * Be careful not to generate a "permanent" XID
400          */
401         if (!TransactionIdIsNormal(limit))
402                 limit = FirstNormalTransactionId;
403
404         /*
405          * Ensure sane relationship of limits
406          */
407         if (TransactionIdFollows(limit, *oldestXmin))
408         {
409                 elog(WARNING, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
410                 limit = *oldestXmin;
411         }
412
413         *freezeLimit = limit;
414 }
415
416
417 /*
418  *      vac_update_relstats() -- update statistics for one relation
419  *
420  *              Update the whole-relation statistics that are kept in its pg_class
421  *              row.  There are additional stats that will be updated if we are
422  *              doing ANALYZE, but we always update these stats.  This routine works
423  *              for both index and heap relation entries in pg_class.
424  *
425  *              We violate no-overwrite semantics here by storing new values for the
426  *              statistics columns directly into the pg_class tuple that's already on
427  *              the page.  The reason for this is that if we updated these tuples in
428  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
429  *              by the time we got done with a vacuum cycle, most of the tuples in
430  *              pg_class would've been obsoleted.  Of course, this only works for
431  *              fixed-size never-null columns, but these are.
432  *
433  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
434  *              ANALYZE.
435  */
436 void
437 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
438                                         bool hasindex)
439 {
440         Relation        rd;
441         HeapTupleData rtup;
442         HeapTuple       ctup;
443         Form_pg_class pgcform;
444         Buffer          buffer;
445
446         /*
447          * update number of tuples and number of pages in pg_class
448          */
449         rd = heap_openr(RelationRelationName, RowExclusiveLock);
450
451         ctup = SearchSysCache(RELOID,
452                                                   ObjectIdGetDatum(relid),
453                                                   0, 0, 0);
454         if (!HeapTupleIsValid(ctup))
455                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
456                          relid);
457
458         /* get the buffer cache tuple */
459         rtup.t_self = ctup->t_self;
460         ReleaseSysCache(ctup);
461         heap_fetch(rd, SnapshotNow, &rtup, &buffer, NULL);
462
463         /* overwrite the existing statistics in the tuple */
464         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
465         pgcform->relpages = (int32) num_pages;
466         pgcform->reltuples = num_tuples;
467         pgcform->relhasindex = hasindex;
468
469         /*
470          * If we have discovered that there are no indexes, then there's no
471          * primary key either.  This could be done more thoroughly...
472          */
473         if (!hasindex)
474                 pgcform->relhaspkey = false;
475
476         /*
477          * Invalidate the tuple in the catcaches; this also arranges to flush
478          * the relation's relcache entry.  (If we fail to commit for some reason,
479          * no flush will occur, but no great harm is done since there are no
480          * noncritical state updates here.)
481          */
482         CacheInvalidateHeapTuple(rd, &rtup);
483
484         /* Write the buffer */
485         WriteBuffer(buffer);
486
487         heap_close(rd, RowExclusiveLock);
488 }
489
490
491 /*
492  *      vac_update_dbstats() -- update statistics for one database
493  *
494  *              Update the whole-database statistics that are kept in its pg_database
495  *              row.
496  *
497  *              We violate no-overwrite semantics here by storing new values for the
498  *              statistics columns directly into the tuple that's already on the page.
499  *              As with vac_update_relstats, this avoids leaving dead tuples behind
500  *              after a VACUUM; which is good since GetRawDatabaseInfo
501  *              can get confused by finding dead tuples in pg_database.
502  *
503  *              This routine is shared by full and lazy VACUUM.  Note that it is only
504  *              applied after a database-wide VACUUM operation.
505  */
506 static void
507 vac_update_dbstats(Oid dbid,
508                                    TransactionId vacuumXID,
509                                    TransactionId frozenXID)
510 {
511         Relation        relation;
512         ScanKeyData entry[1];
513         HeapScanDesc scan;
514         HeapTuple       tuple;
515         Form_pg_database dbform;
516
517         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
518
519         /* Must use a heap scan, since there's no syscache for pg_database */
520         ScanKeyEntryInitialize(&entry[0], 0x0,
521                                                    ObjectIdAttributeNumber, F_OIDEQ,
522                                                    ObjectIdGetDatum(dbid));
523
524         scan = heap_beginscan(relation, 0, SnapshotNow, 1, entry);
525
526         tuple = heap_getnext(scan, 0);
527
528         if (!HeapTupleIsValid(tuple))
529                 elog(ERROR, "database %u does not exist", dbid);
530
531         dbform = (Form_pg_database) GETSTRUCT(tuple);
532
533         /* overwrite the existing statistics in the tuple */
534         dbform->datvacuumxid = vacuumXID;
535         dbform->datfrozenxid = frozenXID;
536
537         /* invalidate the tuple in the cache and write the buffer */
538         CacheInvalidateHeapTuple(relation, tuple);
539         WriteNoReleaseBuffer(scan->rs_cbuf);
540
541         heap_endscan(scan);
542
543         heap_close(relation, RowExclusiveLock);
544 }
545
546
547 /*
548  *      vac_truncate_clog() -- attempt to truncate the commit log
549  *
550  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
551  *              and use it to truncate the transaction commit log (pg_clog).
552  *              Also generate a warning if the system-wide oldest datfrozenxid
553  *              seems to be in danger of wrapping around.
554  *
555  *              The passed XIDs are simply the ones I just wrote into my pg_database
556  *              entry.  They're used to initialize the "min" calculations.
557  *
558  *              This routine is shared by full and lazy VACUUM.  Note that it is only
559  *              applied after a database-wide VACUUM operation.
560  */
561 static void
562 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
563 {
564         TransactionId myXID;
565         Relation        relation;
566         HeapScanDesc scan;
567         HeapTuple       tuple;
568         int32           age;
569         bool            vacuumAlreadyWrapped = false;
570         bool            frozenAlreadyWrapped = false;
571
572         myXID = GetCurrentTransactionId();
573
574         relation = heap_openr(DatabaseRelationName, AccessShareLock);
575
576         scan = heap_beginscan(relation, 0, SnapshotNow, 0, NULL);
577
578         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
579         {
580                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
581
582                 /* Ignore non-connectable databases (eg, template0) */
583                 /* It's assumed that these have been frozen correctly */
584                 if (!dbform->datallowconn)
585                         continue;
586
587                 if (TransactionIdIsNormal(dbform->datvacuumxid))
588                 {
589                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
590                                 vacuumAlreadyWrapped = true;
591                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
592                                 vacuumXID = dbform->datvacuumxid;
593                 }
594                 if (TransactionIdIsNormal(dbform->datfrozenxid))
595                 {
596                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
597                                 frozenAlreadyWrapped = true;
598                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
599                                 frozenXID = dbform->datfrozenxid;
600                 }
601         }
602
603         heap_endscan(scan);
604
605         heap_close(relation, AccessShareLock);
606
607         /*
608          * Do not truncate CLOG if we seem to have suffered wraparound already;
609          * the computed minimum XID might be bogus.
610          */
611         if (vacuumAlreadyWrapped)
612         {
613                 elog(WARNING, "Some databases have not been vacuumed in over 2 billion transactions."
614                          "\n\tYou may have already suffered transaction-wraparound data loss.");
615                 return;
616         }
617
618         /* Truncate CLOG to the oldest vacuumxid */
619         TruncateCLOG(vacuumXID);
620
621         /* Give warning about impending wraparound problems */
622         if (frozenAlreadyWrapped)
623         {
624                 elog(WARNING, "Some databases have not been vacuumed in over 1 billion transactions."
625                          "\n\tBetter vacuum them soon, or you may have a wraparound failure.");
626         }
627         else
628         {
629                 age = (int32) (myXID - frozenXID);
630                 if (age > (int32) ((MaxTransactionId >> 3) * 3))
631                         elog(WARNING, "Some databases have not been vacuumed in %d transactions."
632                                  "\n\tBetter vacuum them within %d transactions,"
633                                  "\n\tor you may have a wraparound failure.",
634                                  age, (int32) (MaxTransactionId >> 1) - age);
635         }
636 }
637
638
639 /****************************************************************************
640  *                                                                                                                                                      *
641  *                      Code common to both flavors of VACUUM                                                   *
642  *                                                                                                                                                      *
643  ****************************************************************************
644  */
645
646
647 /*
648  *      vacuum_rel() -- vacuum one heap relation
649  *
650  *              Doing one heap at a time incurs extra overhead, since we need to
651  *              check that the heap exists again just before we vacuum it.      The
652  *              reason that we do this is so that vacuuming can be spread across
653  *              many small transactions.  Otherwise, two-phase locking would require
654  *              us to lock the entire database during one pass of the vacuum cleaner.
655  *
656  *              At entry and exit, we are not inside a transaction.
657  */
658 static void
659 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
660 {
661         LOCKMODE        lmode;
662         Relation        onerel;
663         LockRelId       onerelid;
664         Oid                     toast_relid;
665
666         /* Begin a transaction for vacuuming this relation */
667         StartTransactionCommand();
668
669         /*
670          * Check for user-requested abort.      Note we want this to be inside a
671          * transaction, so xact.c doesn't issue useless WARNING.
672          */
673         CHECK_FOR_INTERRUPTS();
674
675         /*
676          * Race condition -- if the pg_class tuple has gone away since the
677          * last time we saw it, we don't need to vacuum it.
678          */
679         if (!SearchSysCacheExists(RELOID,
680                                                           ObjectIdGetDatum(relid),
681                                                           0, 0, 0))
682         {
683                 CommitTransactionCommand();
684                 return;
685         }
686
687         /*
688          * Determine the type of lock we want --- hard exclusive lock for a
689          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
690          * vacuum.      Either way, we can be sure that no other backend is
691          * vacuuming the same table.
692          */
693         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
694
695         /*
696          * Open the class, get an appropriate lock on it, and check
697          * permissions.
698          *
699          * We allow the user to vacuum a table if he is superuser, the table
700          * owner, or the database owner (but in the latter case, only if it's
701          * not a shared relation).      pg_class_ownercheck includes the superuser case.
702          *
703          * Note we choose to treat permissions failure as a WARNING and keep
704          * trying to vacuum the rest of the DB --- is this appropriate?
705          */
706         onerel = relation_open(relid, lmode);
707
708         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
709                   (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
710         {
711                 elog(WARNING, "Skipping \"%s\" --- only table or database owner can VACUUM it",
712                          RelationGetRelationName(onerel));
713                 relation_close(onerel, lmode);
714                 CommitTransactionCommand();
715                 return;
716         }
717
718         /*
719          * Check that it's a plain table; we used to do this in getrels() but
720          * seems safer to check after we've locked the relation.
721          */
722         if (onerel->rd_rel->relkind != expected_relkind)
723         {
724                 elog(WARNING, "Skipping \"%s\" --- can not process indexes, views or special system tables",
725                          RelationGetRelationName(onerel));
726                 relation_close(onerel, lmode);
727                 CommitTransactionCommand();
728                 return;
729         }
730
731         /*
732          * Get a session-level lock too. This will protect our access to the
733          * relation across multiple transactions, so that we can vacuum the
734          * relation's TOAST table (if any) secure in the knowledge that no one
735          * is deleting the parent relation.
736          *
737          * NOTE: this cannot block, even if someone else is waiting for access,
738          * because the lock manager knows that both lock requests are from the
739          * same process.
740          */
741         onerelid = onerel->rd_lockInfo.lockRelId;
742         LockRelationForSession(&onerelid, lmode);
743
744         /*
745          * Remember the relation's TOAST relation for later
746          */
747         toast_relid = onerel->rd_rel->reltoastrelid;
748
749         /*
750          * Do the actual work --- either FULL or "lazy" vacuum
751          */
752         if (vacstmt->full)
753                 full_vacuum_rel(onerel, vacstmt);
754         else
755                 lazy_vacuum_rel(onerel, vacstmt);
756
757         /* all done with this class, but hold lock until commit */
758         relation_close(onerel, NoLock);
759
760         /*
761          * Complete the transaction and free all temporary memory used.
762          */
763         CommitTransactionCommand();
764
765         /*
766          * If the relation has a secondary toast rel, vacuum that too while we
767          * still hold the session lock on the master table.  Note however that
768          * "analyze" will not get done on the toast table.      This is good,
769          * because the toaster always uses hardcoded index access and
770          * statistics are totally unimportant for toast relations.
771          */
772         if (toast_relid != InvalidOid)
773                 vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE);
774
775         /*
776          * Now release the session-level lock on the master table.
777          */
778         UnlockRelationForSession(&onerelid, lmode);
779 }
780
781
782 /****************************************************************************
783  *                                                                                                                                                      *
784  *                      Code for VACUUM FULL (only)                                                                             *
785  *                                                                                                                                                      *
786  ****************************************************************************
787  */
788
789
790 /*
791  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
792  *
793  *              This routine vacuums a single heap, cleans out its indexes, and
794  *              updates its num_pages and num_tuples statistics.
795  *
796  *              At entry, we have already established a transaction and opened
797  *              and locked the relation.
798  */
799 static void
800 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
801 {
802         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
803                                                                                  * clean indexes */
804         VacPageListData fraged_pages;           /* List of pages with space enough
805                                                                                  * for re-using */
806         Relation   *Irel;
807         int                     nindexes,
808                                 i;
809         VRelStats  *vacrelstats;
810         bool            reindex = false;
811
812         if (IsIgnoringSystemIndexes() &&
813                 IsSystemRelation(onerel))
814                 reindex = true;
815
816         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
817                                                   &OldestXmin, &FreezeLimit);
818
819         /*
820          * Set up statistics-gathering machinery.
821          */
822         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
823         vacrelstats->rel_pages = 0;
824         vacrelstats->rel_tuples = 0;
825         vacrelstats->hasindex = false;
826
827         /* scan the heap */
828         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
829         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
830
831         /* Now open all indexes of the relation */
832         vac_open_indexes(onerel, &nindexes, &Irel);
833         if (!Irel)
834                 reindex = false;
835         else if (!RelationGetForm(onerel)->relhasindex)
836                 reindex = true;
837         if (nindexes > 0)
838                 vacrelstats->hasindex = true;
839
840 #ifdef NOT_USED
841
842         /*
843          * reindex in VACUUM is dangerous under WAL. ifdef out until it
844          * becomes safe.
845          */
846         if (reindex)
847         {
848                 vac_close_indexes(nindexes, Irel);
849                 Irel = (Relation *) NULL;
850                 activate_indexes_of_a_table(RelationGetRelid(onerel), false);
851         }
852 #endif   /* NOT_USED */
853
854         /* Clean/scan index relation(s) */
855         if (Irel != (Relation *) NULL)
856         {
857                 if (vacuum_pages.num_pages > 0)
858                 {
859                         for (i = 0; i < nindexes; i++)
860                                 vacuum_index(&vacuum_pages, Irel[i],
861                                                          vacrelstats->rel_tuples, 0);
862                 }
863                 else
864                 {
865                         /* just scan indexes to update statistic */
866                         for (i = 0; i < nindexes; i++)
867                                 scan_index(Irel[i], vacrelstats->rel_tuples);
868                 }
869         }
870
871         if (fraged_pages.num_pages > 0)
872         {
873                 /* Try to shrink heap */
874                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
875                                         nindexes, Irel);
876                 vac_close_indexes(nindexes, Irel);
877         }
878         else
879         {
880                 vac_close_indexes(nindexes, Irel);
881                 if (vacuum_pages.num_pages > 0)
882                 {
883                         /* Clean pages from vacuum_pages list */
884                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
885                 }
886                 else
887                 {
888                         /*
889                          * Flush dirty pages out to disk.  We must do this even if we
890                          * didn't do anything else, because we want to ensure that all
891                          * tuples have correct on-row commit status on disk (see
892                          * bufmgr.c's comments for FlushRelationBuffers()).
893                          */
894                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
895                         if (i < 0)
896                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
897                                          i);
898                 }
899         }
900
901 #ifdef NOT_USED
902         if (reindex)
903                 activate_indexes_of_a_table(RelationGetRelid(onerel), true);
904 #endif   /* NOT_USED */
905
906         /* update shared free space map with final free space info */
907         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
908
909         /* update statistics in pg_class */
910         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
911                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
912 }
913
914
915 /*
916  *      scan_heap() -- scan an open heap relation
917  *
918  *              This routine sets commit status bits, constructs vacuum_pages (list
919  *              of pages we need to compact free space on and/or clean indexes of
920  *              deleted tuples), constructs fraged_pages (list of pages with free
921  *              space that tuples could be moved into), and calculates statistics
922  *              on the number of live tuples in the heap.
923  */
924 static void
925 scan_heap(VRelStats *vacrelstats, Relation onerel,
926                   VacPageList vacuum_pages, VacPageList fraged_pages)
927 {
928         BlockNumber nblocks,
929                                 blkno;
930         ItemId          itemid;
931         Buffer          buf;
932         HeapTupleData tuple;
933         OffsetNumber offnum,
934                                 maxoff;
935         bool            pgchanged,
936                                 tupgone,
937                                 notup;
938         char       *relname;
939         VacPage         vacpage,
940                                 vacpagecopy;
941         BlockNumber empty_pages,
942                                 new_pages,
943                                 changed_pages,
944                                 empty_end_pages;
945         double          num_tuples,
946                                 tups_vacuumed,
947                                 nkeep,
948                                 nunused;
949         double          free_size,
950                                 usable_free_size;
951         Size            min_tlen = MaxTupleSize;
952         Size            max_tlen = 0;
953         int                     i;
954         bool            do_shrinking = true;
955         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
956         int                     num_vtlinks = 0;
957         int                     free_vtlinks = 100;
958         VacRUsage       ru0;
959
960         vac_init_rusage(&ru0);
961
962         relname = RelationGetRelationName(onerel);
963         elog(elevel, "--Relation %s.%s--",
964                  get_namespace_name(RelationGetNamespace(onerel)),
965                  relname);
966
967         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
968         num_tuples = tups_vacuumed = nkeep = nunused = 0;
969         free_size = 0;
970
971         nblocks = RelationGetNumberOfBlocks(onerel);
972
973         /*
974          * We initially create each VacPage item in a maximal-sized workspace,
975          * then copy the workspace into a just-large-enough copy.
976          */
977         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
978
979         for (blkno = 0; blkno < nblocks; blkno++)
980         {
981                 Page            page,
982                                         tempPage = NULL;
983                 bool            do_reap,
984                                         do_frag;
985
986                 CHECK_FOR_INTERRUPTS();
987
988                 buf = ReadBuffer(onerel, blkno);
989                 page = BufferGetPage(buf);
990
991                 vacpage->blkno = blkno;
992                 vacpage->offsets_used = 0;
993                 vacpage->offsets_free = 0;
994
995                 if (PageIsNew(page))
996                 {
997                         elog(WARNING, "Rel %s: Uninitialized page %u - fixing",
998                                  relname, blkno);
999                         PageInit(page, BufferGetPageSize(buf), 0);
1000                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1001                         free_size += (vacpage->free - sizeof(ItemIdData));
1002                         new_pages++;
1003                         empty_end_pages++;
1004                         vacpagecopy = copy_vac_page(vacpage);
1005                         vpage_insert(vacuum_pages, vacpagecopy);
1006                         vpage_insert(fraged_pages, vacpagecopy);
1007                         WriteBuffer(buf);
1008                         continue;
1009                 }
1010
1011                 if (PageIsEmpty(page))
1012                 {
1013                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1014                         free_size += (vacpage->free - sizeof(ItemIdData));
1015                         empty_pages++;
1016                         empty_end_pages++;
1017                         vacpagecopy = copy_vac_page(vacpage);
1018                         vpage_insert(vacuum_pages, vacpagecopy);
1019                         vpage_insert(fraged_pages, vacpagecopy);
1020                         ReleaseBuffer(buf);
1021                         continue;
1022                 }
1023
1024                 pgchanged = false;
1025                 notup = true;
1026                 maxoff = PageGetMaxOffsetNumber(page);
1027                 for (offnum = FirstOffsetNumber;
1028                          offnum <= maxoff;
1029                          offnum = OffsetNumberNext(offnum))
1030                 {
1031                         uint16          sv_infomask;
1032
1033                         itemid = PageGetItemId(page, offnum);
1034
1035                         /*
1036                          * Collect un-used items too - it's possible to have indexes
1037                          * pointing here after crash.
1038                          */
1039                         if (!ItemIdIsUsed(itemid))
1040                         {
1041                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1042                                 nunused += 1;
1043                                 continue;
1044                         }
1045
1046                         tuple.t_datamcxt = NULL;
1047                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1048                         tuple.t_len = ItemIdGetLength(itemid);
1049                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1050
1051                         tupgone = false;
1052                         sv_infomask = tuple.t_data->t_infomask;
1053
1054                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1055                         {
1056                                 case HEAPTUPLE_DEAD:
1057                                         tupgone = true;         /* we can delete the tuple */
1058                                         break;
1059                                 case HEAPTUPLE_LIVE:
1060
1061                                         /*
1062                                          * Tuple is good.  Consider whether to replace its
1063                                          * xmin value with FrozenTransactionId.
1064                                          */
1065                                         if (TransactionIdIsNormal(tuple.t_data->t_xmin) &&
1066                                                 TransactionIdPrecedes(tuple.t_data->t_xmin,
1067                                                                                           FreezeLimit))
1068                                         {
1069                                                 tuple.t_data->t_xmin = FrozenTransactionId;
1070                                                 /* infomask should be okay already */
1071                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1072                                                 pgchanged = true;
1073                                         }
1074                                         break;
1075                                 case HEAPTUPLE_RECENTLY_DEAD:
1076
1077                                         /*
1078                                          * If tuple is recently deleted then we must not
1079                                          * remove it from relation.
1080                                          */
1081                                         nkeep += 1;
1082
1083                                         /*
1084                                          * If we do shrinking and this tuple is updated one
1085                                          * then remember it to construct updated tuple
1086                                          * dependencies.
1087                                          */
1088                                         if (do_shrinking &&
1089                                                 !(ItemPointerEquals(&(tuple.t_self),
1090                                                                                         &(tuple.t_data->t_ctid))))
1091                                         {
1092                                                 if (free_vtlinks == 0)
1093                                                 {
1094                                                         free_vtlinks = 1000;
1095                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1096                                                                                    (free_vtlinks + num_vtlinks) *
1097                                                                                                  sizeof(VTupleLinkData));
1098                                                 }
1099                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1100                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1101                                                 free_vtlinks--;
1102                                                 num_vtlinks++;
1103                                         }
1104                                         break;
1105                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1106
1107                                         /*
1108                                          * This should not happen, since we hold exclusive
1109                                          * lock on the relation; shouldn't we raise an error?
1110                                          */
1111                                         elog(WARNING, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
1112                                                  relname, blkno, offnum, tuple.t_data->t_xmin);
1113                                         do_shrinking = false;
1114                                         break;
1115                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1116
1117                                         /*
1118                                          * This should not happen, since we hold exclusive
1119                                          * lock on the relation; shouldn't we raise an error?
1120                                          */
1121                                         elog(WARNING, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
1122                                                  relname, blkno, offnum, tuple.t_data->t_xmax);
1123                                         do_shrinking = false;
1124                                         break;
1125                                 default:
1126                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
1127                                         break;
1128                         }
1129
1130                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1131                         if (sv_infomask != tuple.t_data->t_infomask)
1132                                 pgchanged = true;
1133
1134                         /*
1135                          * Other checks...
1136                          */
1137                         if (!OidIsValid(tuple.t_data->t_oid) &&
1138                                 onerel->rd_rel->relhasoids)
1139                                 elog(WARNING, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
1140                                          relname, blkno, offnum, (int) tupgone);
1141
1142                         if (tupgone)
1143                         {
1144                                 ItemId          lpp;
1145
1146                                 /*
1147                                  * Here we are building a temporary copy of the page with
1148                                  * dead tuples removed.  Below we will apply
1149                                  * PageRepairFragmentation to the copy, so that we can
1150                                  * determine how much space will be available after
1151                                  * removal of dead tuples.      But note we are NOT changing
1152                                  * the real page yet...
1153                                  */
1154                                 if (tempPage == (Page) NULL)
1155                                 {
1156                                         Size            pageSize;
1157
1158                                         pageSize = PageGetPageSize(page);
1159                                         tempPage = (Page) palloc(pageSize);
1160                                         memcpy(tempPage, page, pageSize);
1161                                 }
1162
1163                                 /* mark it unused on the temp page */
1164                                 lpp = PageGetItemId(tempPage, offnum);
1165                                 lpp->lp_flags &= ~LP_USED;
1166
1167                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1168                                 tups_vacuumed += 1;
1169                         }
1170                         else
1171                         {
1172                                 num_tuples += 1;
1173                                 notup = false;
1174                                 if (tuple.t_len < min_tlen)
1175                                         min_tlen = tuple.t_len;
1176                                 if (tuple.t_len > max_tlen)
1177                                         max_tlen = tuple.t_len;
1178                         }
1179                 }                                               /* scan along page */
1180
1181                 if (tempPage != (Page) NULL)
1182                 {
1183                         /* Some tuples are removable; figure free space after removal */
1184                         PageRepairFragmentation(tempPage, NULL);
1185                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1186                         pfree(tempPage);
1187                         do_reap = true;
1188                 }
1189                 else
1190                 {
1191                         /* Just use current available space */
1192                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1193                         /* Need to reap the page if it has ~LP_USED line pointers */
1194                         do_reap = (vacpage->offsets_free > 0);
1195                 }
1196
1197                 free_size += vacpage->free;
1198
1199                 /*
1200                  * Add the page to fraged_pages if it has a useful amount of free
1201                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1202                  * don't know that accurately near the start of the relation, so
1203                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1204                  */
1205                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1206
1207                 if (do_reap || do_frag)
1208                 {
1209                         vacpagecopy = copy_vac_page(vacpage);
1210                         if (do_reap)
1211                                 vpage_insert(vacuum_pages, vacpagecopy);
1212                         if (do_frag)
1213                                 vpage_insert(fraged_pages, vacpagecopy);
1214                 }
1215
1216                 if (notup)
1217                         empty_end_pages++;
1218                 else
1219                         empty_end_pages = 0;
1220
1221                 if (pgchanged)
1222                 {
1223                         WriteBuffer(buf);
1224                         changed_pages++;
1225                 }
1226                 else
1227                         ReleaseBuffer(buf);
1228         }
1229
1230         pfree(vacpage);
1231
1232         /* save stats in the rel list for use later */
1233         vacrelstats->rel_tuples = num_tuples;
1234         vacrelstats->rel_pages = nblocks;
1235         if (num_tuples == 0)
1236                 min_tlen = max_tlen = 0;
1237         vacrelstats->min_tlen = min_tlen;
1238         vacrelstats->max_tlen = max_tlen;
1239
1240         vacuum_pages->empty_end_pages = empty_end_pages;
1241         fraged_pages->empty_end_pages = empty_end_pages;
1242
1243         /*
1244          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1245          * remove any "empty" end-pages from the list, and compute usable free
1246          * space = free space in remaining pages.
1247          */
1248         if (do_shrinking)
1249         {
1250                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1251                 fraged_pages->num_pages -= empty_end_pages;
1252                 usable_free_size = 0;
1253                 for (i = 0; i < fraged_pages->num_pages; i++)
1254                         usable_free_size += fraged_pages->pagedesc[i]->free;
1255         }
1256         else
1257         {
1258                 fraged_pages->num_pages = 0;
1259                 usable_free_size = 0;
1260         }
1261
1262         if (usable_free_size > 0 && num_vtlinks > 0)
1263         {
1264                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1265                           vac_cmp_vtlinks);
1266                 vacrelstats->vtlinks = vtlinks;
1267                 vacrelstats->num_vtlinks = num_vtlinks;
1268         }
1269         else
1270         {
1271                 vacrelstats->vtlinks = NULL;
1272                 vacrelstats->num_vtlinks = 0;
1273                 pfree(vtlinks);
1274         }
1275
1276         elog(elevel, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
1277 Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
1278 Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u.\n\t%s",
1279                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1280                  new_pages, num_tuples, tups_vacuumed,
1281                  nkeep, vacrelstats->num_vtlinks,
1282                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1283                  free_size, usable_free_size,
1284                  empty_end_pages, fraged_pages->num_pages,
1285                  vac_show_rusage(&ru0));
1286
1287 }
1288
1289
1290 /*
1291  *      repair_frag() -- try to repair relation's fragmentation
1292  *
1293  *              This routine marks dead tuples as unused and tries re-use dead space
1294  *              by moving tuples (and inserting indexes if needed). It constructs
1295  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1296  *              for them after committing (in hack-manner - without losing locks
1297  *              and freeing memory!) current transaction. It truncates relation
1298  *              if some end-blocks are gone away.
1299  */
1300 static void
1301 repair_frag(VRelStats *vacrelstats, Relation onerel,
1302                         VacPageList vacuum_pages, VacPageList fraged_pages,
1303                         int nindexes, Relation *Irel)
1304 {
1305         TransactionId myXID;
1306         CommandId       myCID;
1307         Buffer          buf,
1308                                 cur_buffer;
1309         BlockNumber nblocks,
1310                                 blkno;
1311         BlockNumber last_move_dest_block = 0,
1312                                 last_vacuum_block;
1313         Page            page,
1314                                 ToPage = NULL;
1315         OffsetNumber offnum,
1316                                 maxoff,
1317                                 newoff,
1318                                 max_offset;
1319         ItemId          itemid,
1320                                 newitemid;
1321         HeapTupleData tuple,
1322                                 newtup;
1323         TupleDesc       tupdesc;
1324         ResultRelInfo *resultRelInfo;
1325         EState     *estate;
1326         TupleTable      tupleTable;
1327         TupleTableSlot *slot;
1328         VacPageListData Nvacpagelist;
1329         VacPage         cur_page = NULL,
1330                                 last_vacuum_page,
1331                                 vacpage,
1332                            *curpage;
1333         int                     cur_item = 0;
1334         int                     i;
1335         Size            tuple_len;
1336         int                     num_moved,
1337                                 num_fraged_pages,
1338                                 vacuumed_pages;
1339         int                     checked_moved,
1340                                 num_tuples,
1341                                 keep_tuples = 0;
1342         bool            isempty,
1343                                 dowrite,
1344                                 chain_tuple_moved;
1345         VacRUsage       ru0;
1346
1347         vac_init_rusage(&ru0);
1348
1349         myXID = GetCurrentTransactionId();
1350         myCID = GetCurrentCommandId();
1351
1352         tupdesc = RelationGetDescr(onerel);
1353
1354         /*
1355          * We need a ResultRelInfo and an EState so we can use the regular
1356          * executor's index-entry-making machinery.
1357          */
1358         resultRelInfo = makeNode(ResultRelInfo);
1359         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1360         resultRelInfo->ri_RelationDesc = onerel;
1361         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1362
1363         ExecOpenIndices(resultRelInfo);
1364
1365         estate = CreateExecutorState();
1366         estate->es_result_relations = resultRelInfo;
1367         estate->es_num_result_relations = 1;
1368         estate->es_result_relation_info = resultRelInfo;
1369
1370         /* Set up a dummy tuple table too */
1371         tupleTable = ExecCreateTupleTable(1);
1372         slot = ExecAllocTableSlot(tupleTable);
1373         ExecSetSlotDescriptor(slot, tupdesc, false);
1374
1375         Nvacpagelist.num_pages = 0;
1376         num_fraged_pages = fraged_pages->num_pages;
1377         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1378         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1379         if (vacuumed_pages > 0)
1380         {
1381                 /* get last reaped page from vacuum_pages */
1382                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1383                 last_vacuum_block = last_vacuum_page->blkno;
1384         }
1385         else
1386         {
1387                 last_vacuum_page = NULL;
1388                 last_vacuum_block = InvalidBlockNumber;
1389         }
1390         cur_buffer = InvalidBuffer;
1391         num_moved = 0;
1392
1393         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1394         vacpage->offsets_used = vacpage->offsets_free = 0;
1395
1396         /*
1397          * Scan pages backwards from the last nonempty page, trying to move
1398          * tuples down to lower pages.  Quit when we reach a page that we have
1399          * moved any tuples onto, or the first page if we haven't moved
1400          * anything, or when we find a page we cannot completely empty (this
1401          * last condition is handled by "break" statements within the loop).
1402          *
1403          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1404          * in order by blkno.
1405          */
1406         nblocks = vacrelstats->rel_pages;
1407         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1408                  blkno > last_move_dest_block;
1409                  blkno--)
1410         {
1411                 CHECK_FOR_INTERRUPTS();
1412
1413                 /*
1414                  * Forget fraged_pages pages at or after this one; they're no
1415                  * longer useful as move targets, since we only want to move down.
1416                  * Note that since we stop the outer loop at last_move_dest_block,
1417                  * pages removed here cannot have had anything moved onto them
1418                  * already.
1419                  *
1420                  * Also note that we don't change the stored fraged_pages list, only
1421                  * our local variable num_fraged_pages; so the forgotten pages are
1422                  * still available to be loaded into the free space map later.
1423                  */
1424                 while (num_fraged_pages > 0 &&
1425                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1426                 {
1427                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1428                         --num_fraged_pages;
1429                 }
1430
1431                 /*
1432                  * Process this page of relation.
1433                  */
1434                 buf = ReadBuffer(onerel, blkno);
1435                 page = BufferGetPage(buf);
1436
1437                 vacpage->offsets_free = 0;
1438
1439                 isempty = PageIsEmpty(page);
1440
1441                 dowrite = false;
1442
1443                 /* Is the page in the vacuum_pages list? */
1444                 if (blkno == last_vacuum_block)
1445                 {
1446                         if (last_vacuum_page->offsets_free > 0)
1447                         {
1448                                 /* there are dead tuples on this page - clean them */
1449                                 Assert(!isempty);
1450                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1451                                 vacuum_page(onerel, buf, last_vacuum_page);
1452                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1453                                 dowrite = true;
1454                         }
1455                         else
1456                                 Assert(isempty);
1457                         --vacuumed_pages;
1458                         if (vacuumed_pages > 0)
1459                         {
1460                                 /* get prev reaped page from vacuum_pages */
1461                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1462                                 last_vacuum_block = last_vacuum_page->blkno;
1463                         }
1464                         else
1465                         {
1466                                 last_vacuum_page = NULL;
1467                                 last_vacuum_block = InvalidBlockNumber;
1468                         }
1469                         if (isempty)
1470                         {
1471                                 ReleaseBuffer(buf);
1472                                 continue;
1473                         }
1474                 }
1475                 else
1476                         Assert(!isempty);
1477
1478                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1479                                                                                  * off this page, yet */
1480                 vacpage->blkno = blkno;
1481                 maxoff = PageGetMaxOffsetNumber(page);
1482                 for (offnum = FirstOffsetNumber;
1483                          offnum <= maxoff;
1484                          offnum = OffsetNumberNext(offnum))
1485                 {
1486                         itemid = PageGetItemId(page, offnum);
1487
1488                         if (!ItemIdIsUsed(itemid))
1489                                 continue;
1490
1491                         tuple.t_datamcxt = NULL;
1492                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1493                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1494                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1495
1496                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1497                         {
1498                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1499                                         elog(ERROR, "Invalid XID in t_cmin");
1500                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1501                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1502
1503                                 /*
1504                                  * If this (chain) tuple is moved by me already then I
1505                                  * have to check is it in vacpage or not - i.e. is it
1506                                  * moved while cleaning this page or some previous one.
1507                                  */
1508                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1509                                 {
1510                                         if (keep_tuples == 0)
1511                                                 continue;
1512                                         if (chain_tuple_moved)          /* some chains was moved
1513                                                                                                  * while */
1514                                         {                       /* cleaning this page */
1515                                                 Assert(vacpage->offsets_free > 0);
1516                                                 for (i = 0; i < vacpage->offsets_free; i++)
1517                                                 {
1518                                                         if (vacpage->offsets[i] == offnum)
1519                                                                 break;
1520                                                 }
1521                                                 if (i >= vacpage->offsets_free) /* not found */
1522                                                 {
1523                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1524                                                         keep_tuples--;
1525                                                 }
1526                                         }
1527                                         else
1528                                         {
1529                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1530                                                 keep_tuples--;
1531                                         }
1532                                         continue;
1533                                 }
1534                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1535                         }
1536
1537                         /*
1538                          * If this tuple is in the chain of tuples created in updates
1539                          * by "recent" transactions then we have to move all chain of
1540                          * tuples to another places.
1541                          */
1542                         if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
1543                          !TransactionIdPrecedes(tuple.t_data->t_xmin, OldestXmin)) ||
1544                                 (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1545                                  !(ItemPointerEquals(&(tuple.t_self),
1546                                                                          &(tuple.t_data->t_ctid)))))
1547                         {
1548                                 Buffer          Cbuf = buf;
1549                                 Page            Cpage;
1550                                 ItemId          Citemid;
1551                                 ItemPointerData Ctid;
1552                                 HeapTupleData tp = tuple;
1553                                 Size            tlen = tuple_len;
1554                                 VTupleMove      vtmove = (VTupleMove)
1555                                 palloc(100 * sizeof(VTupleMoveData));
1556                                 int                     num_vtmove = 0;
1557                                 int                     free_vtmove = 100;
1558                                 VacPage         to_vacpage = NULL;
1559                                 int                     to_item = 0;
1560                                 bool            freeCbuf = false;
1561                                 int                     ti;
1562
1563                                 if (vacrelstats->vtlinks == NULL)
1564                                         elog(ERROR, "No one parent tuple was found");
1565                                 if (cur_buffer != InvalidBuffer)
1566                                 {
1567                                         WriteBuffer(cur_buffer);
1568                                         cur_buffer = InvalidBuffer;
1569                                 }
1570
1571                                 /*
1572                                  * If this tuple is in the begin/middle of the chain then
1573                                  * we have to move to the end of chain.
1574                                  */
1575                                 while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1576                                            !(ItemPointerEquals(&(tp.t_self),
1577                                                                                    &(tp.t_data->t_ctid))))
1578                                 {
1579                                         Ctid = tp.t_data->t_ctid;
1580                                         if (freeCbuf)
1581                                                 ReleaseBuffer(Cbuf);
1582                                         freeCbuf = true;
1583                                         Cbuf = ReadBuffer(onerel,
1584                                                                           ItemPointerGetBlockNumber(&Ctid));
1585                                         Cpage = BufferGetPage(Cbuf);
1586                                         Citemid = PageGetItemId(Cpage,
1587                                                                           ItemPointerGetOffsetNumber(&Ctid));
1588                                         if (!ItemIdIsUsed(Citemid))
1589                                         {
1590                                                 /*
1591                                                  * This means that in the middle of chain there
1592                                                  * was tuple updated by older (than OldestXmin)
1593                                                  * xaction and this tuple is already deleted by
1594                                                  * me. Actually, upper part of chain should be
1595                                                  * removed and seems that this should be handled
1596                                                  * in scan_heap(), but it's not implemented at the
1597                                                  * moment and so we just stop shrinking here.
1598                                                  */
1599                                                 ReleaseBuffer(Cbuf);
1600                                                 pfree(vtmove);
1601                                                 vtmove = NULL;
1602                                                 elog(WARNING, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1603                                                 break;
1604                                         }
1605                                         tp.t_datamcxt = NULL;
1606                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1607                                         tp.t_self = Ctid;
1608                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1609                                 }
1610                                 if (vtmove == NULL)
1611                                         break;
1612                                 /* first, can chain be moved ? */
1613                                 for (;;)
1614                                 {
1615                                         if (to_vacpage == NULL ||
1616                                                 !enough_space(to_vacpage, tlen))
1617                                         {
1618                                                 for (i = 0; i < num_fraged_pages; i++)
1619                                                 {
1620                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1621                                                                 break;
1622                                                 }
1623
1624                                                 if (i == num_fraged_pages)
1625                                                 {
1626                                                         /* can't move item anywhere */
1627                                                         for (i = 0; i < num_vtmove; i++)
1628                                                         {
1629                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1630                                                                 (vtmove[i].vacpage->offsets_used)--;
1631                                                         }
1632                                                         num_vtmove = 0;
1633                                                         break;
1634                                                 }
1635                                                 to_item = i;
1636                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1637                                         }
1638                                         to_vacpage->free -= MAXALIGN(tlen);
1639                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1640                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1641                                         (to_vacpage->offsets_used)++;
1642                                         if (free_vtmove == 0)
1643                                         {
1644                                                 free_vtmove = 1000;
1645                                                 vtmove = (VTupleMove) repalloc(vtmove,
1646                                                                                          (free_vtmove + num_vtmove) *
1647                                                                                                  sizeof(VTupleMoveData));
1648                                         }
1649                                         vtmove[num_vtmove].tid = tp.t_self;
1650                                         vtmove[num_vtmove].vacpage = to_vacpage;
1651                                         if (to_vacpage->offsets_used == 1)
1652                                                 vtmove[num_vtmove].cleanVpd = true;
1653                                         else
1654                                                 vtmove[num_vtmove].cleanVpd = false;
1655                                         free_vtmove--;
1656                                         num_vtmove++;
1657
1658                                         /* All done ? */
1659                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1660                                         TransactionIdPrecedes(tp.t_data->t_xmin, OldestXmin))
1661                                                 break;
1662
1663                                         /* Well, try to find tuple with old row version */
1664                                         for (;;)
1665                                         {
1666                                                 Buffer          Pbuf;
1667                                                 Page            Ppage;
1668                                                 ItemId          Pitemid;
1669                                                 HeapTupleData Ptp;
1670                                                 VTupleLinkData vtld,
1671                                                                    *vtlp;
1672
1673                                                 vtld.new_tid = tp.t_self;
1674                                                 vtlp = (VTupleLink)
1675                                                         vac_bsearch((void *) &vtld,
1676                                                                                 (void *) (vacrelstats->vtlinks),
1677                                                                                 vacrelstats->num_vtlinks,
1678                                                                                 sizeof(VTupleLinkData),
1679                                                                                 vac_cmp_vtlinks);
1680                                                 if (vtlp == NULL)
1681                                                         elog(ERROR, "Parent tuple was not found");
1682                                                 tp.t_self = vtlp->this_tid;
1683                                                 Pbuf = ReadBuffer(onerel,
1684                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1685                                                 Ppage = BufferGetPage(Pbuf);
1686                                                 Pitemid = PageGetItemId(Ppage,
1687                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1688                                                 if (!ItemIdIsUsed(Pitemid))
1689                                                         elog(ERROR, "Parent itemid marked as unused");
1690                                                 Ptp.t_datamcxt = NULL;
1691                                                 Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1692                                                 Assert(ItemPointerEquals(&(vtld.new_tid),
1693                                                                                                  &(Ptp.t_data->t_ctid)));
1694
1695                                                 /*
1696                                                  * Read above about cases when
1697                                                  * !ItemIdIsUsed(Citemid) (child item is
1698                                                  * removed)... Due to the fact that at the moment
1699                                                  * we don't remove unuseful part of update-chain,
1700                                                  * it's possible to get too old parent row here.
1701                                                  * Like as in the case which caused this problem,
1702                                                  * we stop shrinking here. I could try to find
1703                                                  * real parent row but want not to do it because
1704                                                  * of real solution will be implemented anyway,
1705                                                  * latter, and we are too close to 6.5 release. -
1706                                                  * vadim 06/11/99
1707                                                  */
1708                                                 if (!(TransactionIdEquals(Ptp.t_data->t_xmax,
1709                                                                                                   tp.t_data->t_xmin)))
1710                                                 {
1711                                                         if (freeCbuf)
1712                                                                 ReleaseBuffer(Cbuf);
1713                                                         freeCbuf = false;
1714                                                         ReleaseBuffer(Pbuf);
1715                                                         for (i = 0; i < num_vtmove; i++)
1716                                                         {
1717                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1718                                                                 (vtmove[i].vacpage->offsets_used)--;
1719                                                         }
1720                                                         num_vtmove = 0;
1721                                                         elog(WARNING, "Too old parent tuple found - can't continue repair_frag");
1722                                                         break;
1723                                                 }
1724 #ifdef NOT_USED                                 /* I'm not sure that this will wotk
1725                                                                  * properly... */
1726
1727                                                 /*
1728                                                  * If this tuple is updated version of row and it
1729                                                  * was created by the same transaction then no one
1730                                                  * is interested in this tuple - mark it as
1731                                                  * removed.
1732                                                  */
1733                                                 if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
1734                                                         TransactionIdEquals(Ptp.t_data->t_xmin,
1735                                                                                                 Ptp.t_data->t_xmax))
1736                                                 {
1737                                                         TransactionIdStore(myXID,
1738                                                                 (TransactionId *) &(Ptp.t_data->t_cmin));
1739                                                         Ptp.t_data->t_infomask &=
1740                                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1741                                                         Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
1742                                                         WriteBuffer(Pbuf);
1743                                                         continue;
1744                                                 }
1745 #endif
1746                                                 tp.t_datamcxt = Ptp.t_datamcxt;
1747                                                 tp.t_data = Ptp.t_data;
1748                                                 tlen = tp.t_len = ItemIdGetLength(Pitemid);
1749                                                 if (freeCbuf)
1750                                                         ReleaseBuffer(Cbuf);
1751                                                 Cbuf = Pbuf;
1752                                                 freeCbuf = true;
1753                                                 break;
1754                                         }
1755                                         if (num_vtmove == 0)
1756                                                 break;
1757                                 }
1758                                 if (freeCbuf)
1759                                         ReleaseBuffer(Cbuf);
1760                                 if (num_vtmove == 0)    /* chain can't be moved */
1761                                 {
1762                                         pfree(vtmove);
1763                                         break;
1764                                 }
1765                                 ItemPointerSetInvalid(&Ctid);
1766                                 for (ti = 0; ti < num_vtmove; ti++)
1767                                 {
1768                                         VacPage         destvacpage = vtmove[ti].vacpage;
1769
1770                                         /* Get page to move from */
1771                                         tuple.t_self = vtmove[ti].tid;
1772                                         Cbuf = ReadBuffer(onerel,
1773                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1774
1775                                         /* Get page to move to */
1776                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1777
1778                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1779                                         if (cur_buffer != Cbuf)
1780                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1781
1782                                         ToPage = BufferGetPage(cur_buffer);
1783                                         Cpage = BufferGetPage(Cbuf);
1784
1785                                         Citemid = PageGetItemId(Cpage,
1786                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1787                                         tuple.t_datamcxt = NULL;
1788                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1789                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1790
1791                                         /*
1792                                          * make a copy of the source tuple, and then mark the
1793                                          * source tuple MOVED_OFF.
1794                                          */
1795                                         heap_copytuple_with_tuple(&tuple, &newtup);
1796
1797                                         /*
1798                                          * register invalidation of source tuple in catcaches.
1799                                          */
1800                                         CacheInvalidateHeapTuple(onerel, &tuple);
1801
1802                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1803                                         START_CRIT_SECTION();
1804
1805                                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1806                                         tuple.t_data->t_infomask &=
1807                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1808                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1809
1810                                         /*
1811                                          * If this page was not used before - clean it.
1812                                          *
1813                                          * NOTE: a nasty bug used to lurk here.  It is possible
1814                                          * for the source and destination pages to be the same
1815                                          * (since this tuple-chain member can be on a page
1816                                          * lower than the one we're currently processing in
1817                                          * the outer loop).  If that's true, then after
1818                                          * vacuum_page() the source tuple will have been
1819                                          * moved, and tuple.t_data will be pointing at
1820                                          * garbage.  Therefore we must do everything that uses
1821                                          * tuple.t_data BEFORE this step!!
1822                                          *
1823                                          * This path is different from the other callers of
1824                                          * vacuum_page, because we have already incremented
1825                                          * the vacpage's offsets_used field to account for the
1826                                          * tuple(s) we expect to move onto the page. Therefore
1827                                          * vacuum_page's check for offsets_used == 0 is wrong.
1828                                          * But since that's a good debugging check for all
1829                                          * other callers, we work around it here rather than
1830                                          * remove it.
1831                                          */
1832                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1833                                         {
1834                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1835
1836                                                 destvacpage->offsets_used = 0;
1837                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1838                                                 destvacpage->offsets_used = sv_offsets_used;
1839                                         }
1840
1841                                         /*
1842                                          * Update the state of the copied tuple, and store it
1843                                          * on the destination page.
1844                                          */
1845                                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1846                                         newtup.t_data->t_infomask &=
1847                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1848                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1849                                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1850                                                                                  InvalidOffsetNumber, LP_USED);
1851                                         if (newoff == InvalidOffsetNumber)
1852                                         {
1853                                                 elog(PANIC, "moving chain: failed to add item with len = %lu to page %u",
1854                                                   (unsigned long) tuple_len, destvacpage->blkno);
1855                                         }
1856                                         newitemid = PageGetItemId(ToPage, newoff);
1857                                         pfree(newtup.t_data);
1858                                         newtup.t_datamcxt = NULL;
1859                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1860                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1861
1862                                         {
1863                                                 XLogRecPtr      recptr =
1864                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1865                                                                           cur_buffer, &newtup);
1866
1867                                                 if (Cbuf != cur_buffer)
1868                                                 {
1869                                                         PageSetLSN(Cpage, recptr);
1870                                                         PageSetSUI(Cpage, ThisStartUpID);
1871                                                 }
1872                                                 PageSetLSN(ToPage, recptr);
1873                                                 PageSetSUI(ToPage, ThisStartUpID);
1874                                         }
1875                                         END_CRIT_SECTION();
1876
1877                                         if (destvacpage->blkno > last_move_dest_block)
1878                                                 last_move_dest_block = destvacpage->blkno;
1879
1880                                         /*
1881                                          * Set new tuple's t_ctid pointing to itself for last
1882                                          * tuple in chain, and to next tuple in chain
1883                                          * otherwise.
1884                                          */
1885                                         if (!ItemPointerIsValid(&Ctid))
1886                                                 newtup.t_data->t_ctid = newtup.t_self;
1887                                         else
1888                                                 newtup.t_data->t_ctid = Ctid;
1889                                         Ctid = newtup.t_self;
1890
1891                                         num_moved++;
1892
1893                                         /*
1894                                          * Remember that we moved tuple from the current page
1895                                          * (corresponding index tuple will be cleaned).
1896                                          */
1897                                         if (Cbuf == buf)
1898                                                 vacpage->offsets[vacpage->offsets_free++] =
1899                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1900                                         else
1901                                                 keep_tuples++;
1902
1903                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1904                                         if (cur_buffer != Cbuf)
1905                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
1906
1907                                         /* Create index entries for the moved tuple */
1908                                         if (resultRelInfo->ri_NumIndices > 0)
1909                                         {
1910                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
1911                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
1912                                                                                           estate, true);
1913                                         }
1914
1915                                         WriteBuffer(cur_buffer);
1916                                         WriteBuffer(Cbuf);
1917                                 }
1918                                 cur_buffer = InvalidBuffer;
1919                                 pfree(vtmove);
1920                                 chain_tuple_moved = true;
1921                                 continue;
1922                         }
1923
1924                         /* try to find new page for this tuple */
1925                         if (cur_buffer == InvalidBuffer ||
1926                                 !enough_space(cur_page, tuple_len))
1927                         {
1928                                 if (cur_buffer != InvalidBuffer)
1929                                 {
1930                                         WriteBuffer(cur_buffer);
1931                                         cur_buffer = InvalidBuffer;
1932                                 }
1933                                 for (i = 0; i < num_fraged_pages; i++)
1934                                 {
1935                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
1936                                                 break;
1937                                 }
1938                                 if (i == num_fraged_pages)
1939                                         break;          /* can't move item anywhere */
1940                                 cur_item = i;
1941                                 cur_page = fraged_pages->pagedesc[cur_item];
1942                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
1943                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1944                                 ToPage = BufferGetPage(cur_buffer);
1945                                 /* if this page was not used before - clean it */
1946                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
1947                                         vacuum_page(onerel, cur_buffer, cur_page);
1948                         }
1949                         else
1950                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1951
1952                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1953
1954                         /* copy tuple */
1955                         heap_copytuple_with_tuple(&tuple, &newtup);
1956
1957                         /*
1958                          * register invalidation of source tuple in catcaches.
1959                          *
1960                          * (Note: we do not need to register the copied tuple,
1961                          * because we are not changing the tuple contents and
1962                          * so there cannot be any need to flush negative
1963                          * catcache entries.)
1964                          */
1965                         CacheInvalidateHeapTuple(onerel, &tuple);
1966
1967                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1968                         START_CRIT_SECTION();
1969
1970                         /*
1971                          * Mark new tuple as moved_in by vacuum and store vacuum XID
1972                          * in t_cmin !!!
1973                          */
1974                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1975                         newtup.t_data->t_infomask &=
1976                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1977                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1978
1979                         /* add tuple to the page */
1980                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1981                                                                  InvalidOffsetNumber, LP_USED);
1982                         if (newoff == InvalidOffsetNumber)
1983                         {
1984                                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
1985                                          (unsigned long) tuple_len,
1986                                          cur_page->blkno, (unsigned long) cur_page->free,
1987                                          cur_page->offsets_used, cur_page->offsets_free);
1988                         }
1989                         newitemid = PageGetItemId(ToPage, newoff);
1990                         pfree(newtup.t_data);
1991                         newtup.t_datamcxt = NULL;
1992                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1993                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
1994                         newtup.t_self = newtup.t_data->t_ctid;
1995
1996                         /*
1997                          * Mark old tuple as moved_off by vacuum and store vacuum XID
1998                          * in t_cmin !!!
1999                          */
2000                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
2001                         tuple.t_data->t_infomask &=
2002                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
2003                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
2004
2005                         {
2006                                 XLogRecPtr      recptr =
2007                                 log_heap_move(onerel, buf, tuple.t_self,
2008                                                           cur_buffer, &newtup);
2009
2010                                 PageSetLSN(page, recptr);
2011                                 PageSetSUI(page, ThisStartUpID);
2012                                 PageSetLSN(ToPage, recptr);
2013                                 PageSetSUI(ToPage, ThisStartUpID);
2014                         }
2015                         END_CRIT_SECTION();
2016
2017                         cur_page->offsets_used++;
2018                         num_moved++;
2019                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
2020                         if (cur_page->blkno > last_move_dest_block)
2021                                 last_move_dest_block = cur_page->blkno;
2022
2023                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2024
2025                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2026                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2027
2028                         /* insert index' tuples if needed */
2029                         if (resultRelInfo->ri_NumIndices > 0)
2030                         {
2031                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2032                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
2033                         }
2034                 }                                               /* walk along page */
2035
2036                 if (offnum < maxoff && keep_tuples > 0)
2037                 {
2038                         OffsetNumber off;
2039
2040                         for (off = OffsetNumberNext(offnum);
2041                                  off <= maxoff;
2042                                  off = OffsetNumberNext(off))
2043                         {
2044                                 itemid = PageGetItemId(page, off);
2045                                 if (!ItemIdIsUsed(itemid))
2046                                         continue;
2047                                 tuple.t_datamcxt = NULL;
2048                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2049                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2050                                         continue;
2051                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
2052                                         elog(ERROR, "Invalid XID in t_cmin (4)");
2053                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2054                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
2055                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2056                                 {
2057                                         /* some chains was moved while */
2058                                         if (chain_tuple_moved)
2059                                         {                       /* cleaning this page */
2060                                                 Assert(vacpage->offsets_free > 0);
2061                                                 for (i = 0; i < vacpage->offsets_free; i++)
2062                                                 {
2063                                                         if (vacpage->offsets[i] == off)
2064                                                                 break;
2065                                                 }
2066                                                 if (i >= vacpage->offsets_free) /* not found */
2067                                                 {
2068                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2069                                                         Assert(keep_tuples > 0);
2070                                                         keep_tuples--;
2071                                                 }
2072                                         }
2073                                         else
2074                                         {
2075                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2076                                                 Assert(keep_tuples > 0);
2077                                                 keep_tuples--;
2078                                         }
2079                                 }
2080                         }
2081                 }
2082
2083                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2084                 {
2085                         if (chain_tuple_moved)          /* else - they are ordered */
2086                         {
2087                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2088                                           sizeof(OffsetNumber), vac_cmp_offno);
2089                         }
2090                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2091                         WriteBuffer(buf);
2092                 }
2093                 else if (dowrite)
2094                         WriteBuffer(buf);
2095                 else
2096                         ReleaseBuffer(buf);
2097
2098                 if (offnum <= maxoff)
2099                         break;                          /* some item(s) left */
2100
2101         }                                                       /* walk along relation */
2102
2103         blkno++;                                        /* new number of blocks */
2104
2105         if (cur_buffer != InvalidBuffer)
2106         {
2107                 Assert(num_moved > 0);
2108                 WriteBuffer(cur_buffer);
2109         }
2110
2111         if (num_moved > 0)
2112         {
2113                 /*
2114                  * We have to commit our tuple movings before we truncate the
2115                  * relation.  Ideally we should do Commit/StartTransactionCommand
2116                  * here, relying on the session-level table lock to protect our
2117                  * exclusive access to the relation.  However, that would require
2118                  * a lot of extra code to close and re-open the relation, indexes,
2119                  * etc.  For now, a quick hack: record status of current
2120                  * transaction as committed, and continue.
2121                  */
2122                 RecordTransactionCommit();
2123         }
2124
2125         /*
2126          * We are not going to move any more tuples across pages, but we still
2127          * need to apply vacuum_page to compact free space in the remaining
2128          * pages in vacuum_pages list.  Note that some of these pages may also
2129          * be in the fraged_pages list, and may have had tuples moved onto
2130          * them; if so, we already did vacuum_page and needn't do it again.
2131          */
2132         for (i = 0, curpage = vacuum_pages->pagedesc;
2133                  i < vacuumed_pages;
2134                  i++, curpage++)
2135         {
2136                 CHECK_FOR_INTERRUPTS();
2137                 Assert((*curpage)->blkno < blkno);
2138                 if ((*curpage)->offsets_used == 0)
2139                 {
2140                         /* this page was not used as a move target, so must clean it */
2141                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2142                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2143                         page = BufferGetPage(buf);
2144                         if (!PageIsEmpty(page))
2145                                 vacuum_page(onerel, buf, *curpage);
2146                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2147                         WriteBuffer(buf);
2148                 }
2149         }
2150
2151         /*
2152          * Now scan all the pages that we moved tuples onto and update tuple
2153          * status bits.  This is not really necessary, but will save time for
2154          * future transactions examining these tuples.
2155          *
2156          * XXX WARNING that this code fails to clear HEAP_MOVED_OFF tuples from
2157          * pages that were move source pages but not move dest pages.  One
2158          * also wonders whether it wouldn't be better to skip this step and
2159          * let the tuple status updates happen someplace that's not holding an
2160          * exclusive lock on the relation.
2161          */
2162         checked_moved = 0;
2163         for (i = 0, curpage = fraged_pages->pagedesc;
2164                  i < num_fraged_pages;
2165                  i++, curpage++)
2166         {
2167                 CHECK_FOR_INTERRUPTS();
2168                 Assert((*curpage)->blkno < blkno);
2169                 if ((*curpage)->blkno > last_move_dest_block)
2170                         break;                          /* no need to scan any further */
2171                 if ((*curpage)->offsets_used == 0)
2172                         continue;                       /* this page was never used as a move dest */
2173                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2174                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2175                 page = BufferGetPage(buf);
2176                 num_tuples = 0;
2177                 max_offset = PageGetMaxOffsetNumber(page);
2178                 for (newoff = FirstOffsetNumber;
2179                          newoff <= max_offset;
2180                          newoff = OffsetNumberNext(newoff))
2181                 {
2182                         itemid = PageGetItemId(page, newoff);
2183                         if (!ItemIdIsUsed(itemid))
2184                                 continue;
2185                         tuple.t_datamcxt = NULL;
2186                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2187                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2188                         {
2189                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
2190                                         elog(ERROR, "Invalid XID in t_cmin (2)");
2191                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2192                                 {
2193                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2194                                         num_tuples++;
2195                                 }
2196                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2197                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2198                                 else
2199                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2200                         }
2201                 }
2202                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2203                 WriteBuffer(buf);
2204                 Assert((*curpage)->offsets_used == num_tuples);
2205                 checked_moved += num_tuples;
2206         }
2207         Assert(num_moved == checked_moved);
2208
2209         elog(elevel, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
2210                  RelationGetRelationName(onerel),
2211                  nblocks, blkno, num_moved,
2212                  vac_show_rusage(&ru0));
2213
2214         /*
2215          * Reflect the motion of system tuples to catalog cache here.
2216          */
2217         CommandCounterIncrement();
2218
2219         if (Nvacpagelist.num_pages > 0)
2220         {
2221                 /* vacuum indexes again if needed */
2222                 if (Irel != (Relation *) NULL)
2223                 {
2224                         VacPage    *vpleft,
2225                                            *vpright,
2226                                                 vpsave;
2227
2228                         /* re-sort Nvacpagelist.pagedesc */
2229                         for (vpleft = Nvacpagelist.pagedesc,
2230                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2231                                  vpleft < vpright; vpleft++, vpright--)
2232                         {
2233                                 vpsave = *vpleft;
2234                                 *vpleft = *vpright;
2235                                 *vpright = vpsave;
2236                         }
2237                         Assert(keep_tuples >= 0);
2238                         for (i = 0; i < nindexes; i++)
2239                                 vacuum_index(&Nvacpagelist, Irel[i],
2240                                                          vacrelstats->rel_tuples, keep_tuples);
2241                 }
2242
2243                 /* clean moved tuples from last page in Nvacpagelist list */
2244                 if (vacpage->blkno == (blkno - 1) &&
2245                         vacpage->offsets_free > 0)
2246                 {
2247                         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2248                         OffsetNumber *unused = unbuf;
2249                         int                     uncnt;
2250
2251                         buf = ReadBuffer(onerel, vacpage->blkno);
2252                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2253                         page = BufferGetPage(buf);
2254                         num_tuples = 0;
2255                         maxoff = PageGetMaxOffsetNumber(page);
2256                         for (offnum = FirstOffsetNumber;
2257                                  offnum <= maxoff;
2258                                  offnum = OffsetNumberNext(offnum))
2259                         {
2260                                 itemid = PageGetItemId(page, offnum);
2261                                 if (!ItemIdIsUsed(itemid))
2262                                         continue;
2263                                 tuple.t_datamcxt = NULL;
2264                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2265
2266                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2267                                 {
2268                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
2269                                                 elog(ERROR, "Invalid XID in t_cmin (3)");
2270                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2271                                         {
2272                                                 itemid->lp_flags &= ~LP_USED;
2273                                                 num_tuples++;
2274                                         }
2275                                         else
2276                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2277                                 }
2278
2279                         }
2280                         Assert(vacpage->offsets_free == num_tuples);
2281                         START_CRIT_SECTION();
2282                         uncnt = PageRepairFragmentation(page, unused);
2283                         {
2284                                 XLogRecPtr      recptr;
2285
2286                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
2287                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2288                                 PageSetLSN(page, recptr);
2289                                 PageSetSUI(page, ThisStartUpID);
2290                         }
2291                         END_CRIT_SECTION();
2292                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2293                         WriteBuffer(buf);
2294                 }
2295
2296                 /* now - free new list of reaped pages */
2297                 curpage = Nvacpagelist.pagedesc;
2298                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2299                         pfree(*curpage);
2300                 pfree(Nvacpagelist.pagedesc);
2301         }
2302
2303         /*
2304          * Flush dirty pages out to disk.  We do this unconditionally, even if
2305          * we don't need to truncate, because we want to ensure that all
2306          * tuples have correct on-row commit status on disk (see bufmgr.c's
2307          * comments for FlushRelationBuffers()).
2308          */
2309         i = FlushRelationBuffers(onerel, blkno);
2310         if (i < 0)
2311                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2312                          i);
2313
2314         /* truncate relation, if needed */
2315         if (blkno < nblocks)
2316         {
2317                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2318                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2319                 onerel->rd_targblock = InvalidBlockNumber;
2320                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2321         }
2322
2323         /* clean up */
2324         pfree(vacpage);
2325         if (vacrelstats->vtlinks != NULL)
2326                 pfree(vacrelstats->vtlinks);
2327
2328         ExecDropTupleTable(tupleTable, true);
2329
2330         ExecCloseIndices(resultRelInfo);
2331 }
2332
2333 /*
2334  *      vacuum_heap() -- free dead tuples
2335  *
2336  *              This routine marks dead tuples as unused and truncates relation
2337  *              if there are "empty" end-blocks.
2338  */
2339 static void
2340 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2341 {
2342         Buffer          buf;
2343         VacPage    *vacpage;
2344         BlockNumber relblocks;
2345         int                     nblocks;
2346         int                     i;
2347
2348         nblocks = vacuum_pages->num_pages;
2349         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2350
2351         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2352         {
2353                 CHECK_FOR_INTERRUPTS();
2354                 if ((*vacpage)->offsets_free > 0)
2355                 {
2356                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2357                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2358                         vacuum_page(onerel, buf, *vacpage);
2359                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2360                         WriteBuffer(buf);
2361                 }
2362         }
2363
2364         /*
2365          * Flush dirty pages out to disk.  We do this unconditionally, even if
2366          * we don't need to truncate, because we want to ensure that all
2367          * tuples have correct on-row commit status on disk (see bufmgr.c's
2368          * comments for FlushRelationBuffers()).
2369          */
2370         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2371         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2372
2373         i = FlushRelationBuffers(onerel, relblocks);
2374         if (i < 0)
2375                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2376                          i);
2377
2378         /* truncate relation if there are some empty end-pages */
2379         if (vacuum_pages->empty_end_pages > 0)
2380         {
2381                 elog(elevel, "Rel %s: Pages: %u --> %u.",
2382                          RelationGetRelationName(onerel),
2383                          vacrelstats->rel_pages, relblocks);
2384                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2385                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2386                 onerel->rd_targblock = InvalidBlockNumber;
2387                 vacrelstats->rel_pages = relblocks;             /* set new number of
2388                                                                                                  * blocks */
2389         }
2390 }
2391
2392 /*
2393  *      vacuum_page() -- free dead tuples on a page
2394  *                                       and repair its fragmentation.
2395  */
2396 static void
2397 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2398 {
2399         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2400         OffsetNumber *unused = unbuf;
2401         int                     uncnt;
2402         Page            page = BufferGetPage(buffer);
2403         ItemId          itemid;
2404         int                     i;
2405
2406         /* There shouldn't be any tuples moved onto the page yet! */
2407         Assert(vacpage->offsets_used == 0);
2408
2409         START_CRIT_SECTION();
2410         for (i = 0; i < vacpage->offsets_free; i++)
2411         {
2412                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2413                 itemid->lp_flags &= ~LP_USED;
2414         }
2415         uncnt = PageRepairFragmentation(page, unused);
2416         {
2417                 XLogRecPtr      recptr;
2418
2419                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2420                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2421                 PageSetLSN(page, recptr);
2422                 PageSetSUI(page, ThisStartUpID);
2423         }
2424         END_CRIT_SECTION();
2425 }
2426
2427 /*
2428  *      scan_index() -- scan one index relation to update statistic.
2429  *
2430  * We use this when we have no deletions to do.
2431  */
2432 static void
2433 scan_index(Relation indrel, double num_tuples)
2434 {
2435         IndexBulkDeleteResult *stats;
2436         VacRUsage       ru0;
2437
2438         vac_init_rusage(&ru0);
2439
2440         /*
2441          * Even though we're not planning to delete anything, use the
2442          * ambulkdelete call, so that the scan happens within the index AM for
2443          * more speed.
2444          */
2445         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2446
2447         if (!stats)
2448                 return;
2449
2450         /* now update statistics in pg_class */
2451         vac_update_relstats(RelationGetRelid(indrel),
2452                                                 stats->num_pages, stats->num_index_tuples,
2453                                                 false);
2454
2455         elog(elevel, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
2456                  RelationGetRelationName(indrel),
2457                  stats->num_pages, stats->num_index_tuples,
2458                  vac_show_rusage(&ru0));
2459
2460         /*
2461          * Check for tuple count mismatch.      If the index is partial, then it's
2462          * OK for it to have fewer tuples than the heap; else we got trouble.
2463          */
2464         if (stats->num_index_tuples != num_tuples)
2465         {
2466                 if (stats->num_index_tuples > num_tuples ||
2467                         !vac_is_partial_index(indrel))
2468                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2469 \n\tRecreate the index.",
2470                                  RelationGetRelationName(indrel),
2471                                  stats->num_index_tuples, num_tuples);
2472         }
2473
2474         pfree(stats);
2475 }
2476
2477 /*
2478  *      vacuum_index() -- vacuum one index relation.
2479  *
2480  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2481  *              It's locked. Indrel is an index relation on the vacuumed heap.
2482  *
2483  *              We don't bother to set locks on the index relation here, since
2484  *              the parent table is exclusive-locked already.
2485  *
2486  *              Finally, we arrange to update the index relation's statistics in
2487  *              pg_class.
2488  */
2489 static void
2490 vacuum_index(VacPageList vacpagelist, Relation indrel,
2491                          double num_tuples, int keep_tuples)
2492 {
2493         IndexBulkDeleteResult *stats;
2494         VacRUsage       ru0;
2495
2496         vac_init_rusage(&ru0);
2497
2498         /* Do bulk deletion */
2499         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2500
2501         if (!stats)
2502                 return;
2503
2504         /* now update statistics in pg_class */
2505         vac_update_relstats(RelationGetRelid(indrel),
2506                                                 stats->num_pages, stats->num_index_tuples,
2507                                                 false);
2508
2509         elog(elevel, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
2510                  RelationGetRelationName(indrel), stats->num_pages,
2511                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2512                  vac_show_rusage(&ru0));
2513
2514         /*
2515          * Check for tuple count mismatch.      If the index is partial, then it's
2516          * OK for it to have fewer tuples than the heap; else we got trouble.
2517          */
2518         if (stats->num_index_tuples != num_tuples + keep_tuples)
2519         {
2520                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2521                         !vac_is_partial_index(indrel))
2522                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2523 \n\tRecreate the index.",
2524                                  RelationGetRelationName(indrel),
2525                                  stats->num_index_tuples, num_tuples);
2526         }
2527
2528         pfree(stats);
2529 }
2530
2531 /*
2532  *      tid_reaped() -- is a particular tid reaped?
2533  *
2534  *              This has the right signature to be an IndexBulkDeleteCallback.
2535  *
2536  *              vacpagelist->VacPage_array is sorted in right order.
2537  */
2538 static bool
2539 tid_reaped(ItemPointer itemptr, void *state)
2540 {
2541         VacPageList vacpagelist = (VacPageList) state;
2542         OffsetNumber ioffno;
2543         OffsetNumber *voff;
2544         VacPage         vp,
2545                            *vpp;
2546         VacPageData vacpage;
2547
2548         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2549         ioffno = ItemPointerGetOffsetNumber(itemptr);
2550
2551         vp = &vacpage;
2552         vpp = (VacPage *) vac_bsearch((void *) &vp,
2553                                                                   (void *) (vacpagelist->pagedesc),
2554                                                                   vacpagelist->num_pages,
2555                                                                   sizeof(VacPage),
2556                                                                   vac_cmp_blk);
2557
2558         if (vpp == NULL)
2559                 return false;
2560
2561         /* ok - we are on a partially or fully reaped page */
2562         vp = *vpp;
2563
2564         if (vp->offsets_free == 0)
2565         {
2566                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2567                 return true;
2568         }
2569
2570         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2571                                                                                 (void *) (vp->offsets),
2572                                                                                 vp->offsets_free,
2573                                                                                 sizeof(OffsetNumber),
2574                                                                                 vac_cmp_offno);
2575
2576         if (voff == NULL)
2577                 return false;
2578
2579         /* tid is reaped */
2580         return true;
2581 }
2582
2583 /*
2584  * Dummy version for scan_index.
2585  */
2586 static bool
2587 dummy_tid_reaped(ItemPointer itemptr, void *state)
2588 {
2589         return false;
2590 }
2591
2592 /*
2593  * Update the shared Free Space Map with the info we now have about
2594  * free space in the relation, discarding any old info the map may have.
2595  */
2596 static void
2597 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2598                            BlockNumber rel_pages)
2599 {
2600         int                     nPages = fraged_pages->num_pages;
2601         int                     i;
2602         BlockNumber *pages;
2603         Size       *spaceAvail;
2604
2605         /* +1 to avoid palloc(0) */
2606         pages = (BlockNumber *) palloc((nPages + 1) * sizeof(BlockNumber));
2607         spaceAvail = (Size *) palloc((nPages + 1) * sizeof(Size));
2608
2609         for (i = 0; i < nPages; i++)
2610         {
2611                 pages[i] = fraged_pages->pagedesc[i]->blkno;
2612                 spaceAvail[i] = fraged_pages->pagedesc[i]->free;
2613
2614                 /*
2615                  * fraged_pages may contain entries for pages that we later
2616                  * decided to truncate from the relation; don't enter them into
2617                  * the map!
2618                  */
2619                 if (pages[i] >= rel_pages)
2620                 {
2621                         nPages = i;
2622                         break;
2623                 }
2624         }
2625
2626         MultiRecordFreeSpace(&onerel->rd_node,
2627                                                  0, MaxBlockNumber,
2628                                                  nPages, pages, spaceAvail);
2629         pfree(pages);
2630         pfree(spaceAvail);
2631 }
2632
2633 /* Copy a VacPage structure */
2634 static VacPage
2635 copy_vac_page(VacPage vacpage)
2636 {
2637         VacPage         newvacpage;
2638
2639         /* allocate a VacPageData entry */
2640         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2641                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2642
2643         /* fill it in */
2644         if (vacpage->offsets_free > 0)
2645                 memcpy(newvacpage->offsets, vacpage->offsets,
2646                            vacpage->offsets_free * sizeof(OffsetNumber));
2647         newvacpage->blkno = vacpage->blkno;
2648         newvacpage->free = vacpage->free;
2649         newvacpage->offsets_used = vacpage->offsets_used;
2650         newvacpage->offsets_free = vacpage->offsets_free;
2651
2652         return newvacpage;
2653 }
2654
2655 /*
2656  * Add a VacPage pointer to a VacPageList.
2657  *
2658  *              As a side effect of the way that scan_heap works,
2659  *              higher pages come after lower pages in the array
2660  *              (and highest tid on a page is last).
2661  */
2662 static void
2663 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2664 {
2665 #define PG_NPAGEDESC 1024
2666
2667         /* allocate a VacPage entry if needed */
2668         if (vacpagelist->num_pages == 0)
2669         {
2670                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2671                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2672         }
2673         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2674         {
2675                 vacpagelist->num_allocated_pages *= 2;
2676                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2677         }
2678         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2679         (vacpagelist->num_pages)++;
2680 }
2681
2682 /*
2683  * vac_bsearch: just like standard C library routine bsearch(),
2684  * except that we first test to see whether the target key is outside
2685  * the range of the table entries.      This case is handled relatively slowly
2686  * by the normal binary search algorithm (ie, no faster than any other key)
2687  * but it occurs often enough in VACUUM to be worth optimizing.
2688  */
2689 static void *
2690 vac_bsearch(const void *key, const void *base,
2691                         size_t nelem, size_t size,
2692                         int (*compar) (const void *, const void *))
2693 {
2694         int                     res;
2695         const void *last;
2696
2697         if (nelem == 0)
2698                 return NULL;
2699         res = compar(key, base);
2700         if (res < 0)
2701                 return NULL;
2702         if (res == 0)
2703                 return (void *) base;
2704         if (nelem > 1)
2705         {
2706                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2707                 res = compar(key, last);
2708                 if (res > 0)
2709                         return NULL;
2710                 if (res == 0)
2711                         return (void *) last;
2712         }
2713         if (nelem <= 2)
2714                 return NULL;                    /* already checked 'em all */
2715         return bsearch(key, base, nelem, size, compar);
2716 }
2717
2718 /*
2719  * Comparator routines for use with qsort() and bsearch().
2720  */
2721 static int
2722 vac_cmp_blk(const void *left, const void *right)
2723 {
2724         BlockNumber lblk,
2725                                 rblk;
2726
2727         lblk = (*((VacPage *) left))->blkno;
2728         rblk = (*((VacPage *) right))->blkno;
2729
2730         if (lblk < rblk)
2731                 return -1;
2732         if (lblk == rblk)
2733                 return 0;
2734         return 1;
2735 }
2736
2737 static int
2738 vac_cmp_offno(const void *left, const void *right)
2739 {
2740         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2741                 return -1;
2742         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2743                 return 0;
2744         return 1;
2745 }
2746
2747 static int
2748 vac_cmp_vtlinks(const void *left, const void *right)
2749 {
2750         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2751                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2752                 return -1;
2753         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2754                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2755                 return 1;
2756         /* bi_hi-es are equal */
2757         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2758                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2759                 return -1;
2760         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2761                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2762                 return 1;
2763         /* bi_lo-es are equal */
2764         if (((VTupleLink) left)->new_tid.ip_posid <
2765                 ((VTupleLink) right)->new_tid.ip_posid)
2766                 return -1;
2767         if (((VTupleLink) left)->new_tid.ip_posid >
2768                 ((VTupleLink) right)->new_tid.ip_posid)
2769                 return 1;
2770         return 0;
2771 }
2772
2773
2774 void
2775 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2776 {
2777         List       *indexoidlist,
2778                            *indexoidscan;
2779         int                     i;
2780
2781         indexoidlist = RelationGetIndexList(relation);
2782
2783         *nindexes = length(indexoidlist);
2784
2785         if (*nindexes > 0)
2786                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2787         else
2788                 *Irel = NULL;
2789
2790         i = 0;
2791         foreach(indexoidscan, indexoidlist)
2792         {
2793                 Oid                     indexoid = lfirsti(indexoidscan);
2794
2795                 (*Irel)[i] = index_open(indexoid);
2796                 i++;
2797         }
2798
2799         freeList(indexoidlist);
2800 }
2801
2802
2803 void
2804 vac_close_indexes(int nindexes, Relation *Irel)
2805 {
2806         if (Irel == (Relation *) NULL)
2807                 return;
2808
2809         while (nindexes--)
2810                 index_close(Irel[nindexes]);
2811         pfree(Irel);
2812 }
2813
2814
2815 /*
2816  * Is an index partial (ie, could it contain fewer tuples than the heap?)
2817  */
2818 bool
2819 vac_is_partial_index(Relation indrel)
2820 {
2821         /*
2822          * If the index's AM doesn't support nulls, it's partial for our
2823          * purposes
2824          */
2825         if (!indrel->rd_am->amindexnulls)
2826                 return true;
2827
2828         /* Otherwise, look to see if there's a partial-index predicate */
2829         return (VARSIZE(&indrel->rd_index->indpred) > VARHDRSZ);
2830 }
2831
2832
2833 static bool
2834 enough_space(VacPage vacpage, Size len)
2835 {
2836         len = MAXALIGN(len);
2837
2838         if (len > vacpage->free)
2839                 return false;
2840
2841         /* if there are free itemid(s) and len <= free_space... */
2842         if (vacpage->offsets_used < vacpage->offsets_free)
2843                 return true;
2844
2845         /* noff_used >= noff_free and so we'll have to allocate new itemid */
2846         if (len + sizeof(ItemIdData) <= vacpage->free)
2847                 return true;
2848
2849         return false;
2850 }
2851
2852
2853 /*
2854  * Initialize usage snapshot.
2855  */
2856 void
2857 vac_init_rusage(VacRUsage *ru0)
2858 {
2859         struct timezone tz;
2860
2861         getrusage(RUSAGE_SELF, &ru0->ru);
2862         gettimeofday(&ru0->tv, &tz);
2863 }
2864
2865 /*
2866  * Compute elapsed time since ru0 usage snapshot, and format into
2867  * a displayable string.  Result is in a static string, which is
2868  * tacky, but no one ever claimed that the Postgres backend is
2869  * threadable...
2870  */
2871 const char *
2872 vac_show_rusage(VacRUsage *ru0)
2873 {
2874         static char result[100];
2875         VacRUsage       ru1;
2876
2877         vac_init_rusage(&ru1);
2878
2879         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
2880         {
2881                 ru1.tv.tv_sec--;
2882                 ru1.tv.tv_usec += 1000000;
2883         }
2884         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
2885         {
2886                 ru1.ru.ru_stime.tv_sec--;
2887                 ru1.ru.ru_stime.tv_usec += 1000000;
2888         }
2889         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
2890         {
2891                 ru1.ru.ru_utime.tv_sec--;
2892                 ru1.ru.ru_utime.tv_usec += 1000000;
2893         }
2894
2895         snprintf(result, sizeof(result),
2896                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
2897                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
2898           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
2899                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
2900           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
2901                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
2902                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
2903
2904         return result;
2905 }