]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Make VACUUM handle schema-qualified relation names properly.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.221 2002/04/02 01:03:05 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_index.h"
33 #include "commands/vacuum.h"
34 #include "executor/executor.h"
35 #include "miscadmin.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "storage/smgr.h"
39 #include "tcop/pquery.h"
40 #include "utils/acl.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/inval.h"
44 #include "utils/lsyscache.h"
45 #include "utils/relcache.h"
46 #include "utils/syscache.h"
47 #include "pgstat.h"
48
49
50 typedef struct VacPageData
51 {
52         BlockNumber blkno;                      /* BlockNumber of this Page */
53         Size            free;                   /* FreeSpace on this Page */
54         uint16          offsets_used;   /* Number of OffNums used by vacuum */
55         uint16          offsets_free;   /* Number of OffNums free or to be free */
56         OffsetNumber offsets[1];        /* Array of free OffNums */
57 } VacPageData;
58
59 typedef VacPageData *VacPage;
60
61 typedef struct VacPageListData
62 {
63         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
64         int                     num_pages;              /* Number of pages in pagedesc */
65         int                     num_allocated_pages;    /* Number of allocated pages in
66                                                                                  * pagedesc */
67         VacPage    *pagedesc;           /* Descriptions of pages */
68 } VacPageListData;
69
70 typedef VacPageListData *VacPageList;
71
72 typedef struct VTupleLinkData
73 {
74         ItemPointerData new_tid;
75         ItemPointerData this_tid;
76 } VTupleLinkData;
77
78 typedef VTupleLinkData *VTupleLink;
79
80 typedef struct VTupleMoveData
81 {
82         ItemPointerData tid;            /* tuple ID */
83         VacPage         vacpage;                /* where to move */
84         bool            cleanVpd;               /* clean vacpage before using */
85 } VTupleMoveData;
86
87 typedef VTupleMoveData *VTupleMove;
88
89 typedef struct VRelStats
90 {
91         BlockNumber rel_pages;
92         double          rel_tuples;
93         Size            min_tlen;
94         Size            max_tlen;
95         bool            hasindex;
96         int                     num_vtlinks;
97         VTupleLink      vtlinks;
98 } VRelStats;
99
100
101 static MemoryContext vac_context = NULL;
102
103 static int elevel = -1;
104
105 static TransactionId OldestXmin;
106 static TransactionId FreezeLimit;
107
108 static TransactionId initialOldestXmin;
109 static TransactionId initialFreezeLimit;
110
111
112 /* non-export function prototypes */
113 static void vacuum_init(VacuumStmt *vacstmt);
114 static void vacuum_shutdown(VacuumStmt *vacstmt);
115 static List *getrels(const RangeVar *vacrel, const char *stmttype);
116 static void vac_update_dbstats(Oid dbid,
117                                    TransactionId vacuumXID,
118                                    TransactionId frozenXID);
119 static void vac_truncate_clog(TransactionId vacuumXID,
120                                   TransactionId frozenXID);
121 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
122 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
123 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
124                   VacPageList vacuum_pages, VacPageList fraged_pages);
125 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
126                         VacPageList vacuum_pages, VacPageList fraged_pages,
127                         int nindexes, Relation *Irel);
128 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
129                         VacPageList vacpagelist);
130 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
131 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
132                          double num_tuples, int keep_tuples);
133 static void scan_index(Relation indrel, double num_tuples);
134 static bool tid_reaped(ItemPointer itemptr, void *state);
135 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
136 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
137                            BlockNumber rel_pages);
138 static VacPage copy_vac_page(VacPage vacpage);
139 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
140 static void *vac_bsearch(const void *key, const void *base,
141                         size_t nelem, size_t size,
142                         int (*compar) (const void *, const void *));
143 static int      vac_cmp_blk(const void *left, const void *right);
144 static int      vac_cmp_offno(const void *left, const void *right);
145 static int      vac_cmp_vtlinks(const void *left, const void *right);
146 static bool enough_space(VacPage vacpage, Size len);
147
148
149 /****************************************************************************
150  *                                                                                                                                                      *
151  *                      Code common to all flavors of VACUUM and ANALYZE                                *
152  *                                                                                                                                                      *
153  ****************************************************************************
154  */
155
156
157 /*
158  * Primary entry point for VACUUM and ANALYZE commands.
159  */
160 void
161 vacuum(VacuumStmt *vacstmt)
162 {
163         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
164         List       *vrl,
165                            *cur;
166
167         if (vacstmt->verbose)
168                 elevel = INFO;
169         else
170                 elevel = DEBUG1;
171
172         /*
173          * We cannot run VACUUM inside a user transaction block; if we were
174          * inside a transaction, then our commit- and
175          * start-transaction-command calls would not have the intended effect!
176          * Furthermore, the forced commit that occurs before truncating the
177          * relation's file would have the effect of committing the rest of the
178          * user's transaction too, which would certainly not be the desired
179          * behavior.
180          */
181         if (IsTransactionBlock())
182                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
183
184         /*
185          * Send info about dead objects to the statistics collector
186          */
187         pgstat_vacuum_tabstat();
188
189         /*
190          * Create special memory context for cross-transaction storage.
191          *
192          * Since it is a child of QueryContext, it will go away eventually even
193          * if we suffer an error; there's no need for special abort cleanup
194          * logic.
195          */
196         vac_context = AllocSetContextCreate(QueryContext,
197                                                                                 "Vacuum",
198                                                                                 ALLOCSET_DEFAULT_MINSIZE,
199                                                                                 ALLOCSET_DEFAULT_INITSIZE,
200                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
201
202         /* Build list of relations to process (note this lives in vac_context) */
203         vrl = getrels(vacstmt->relation, stmttype);
204
205         /*
206          * Start up the vacuum cleaner.
207          */
208         vacuum_init(vacstmt);
209
210         /*
211          * Process each selected relation.      We are careful to process each
212          * relation in a separate transaction in order to avoid holding too
213          * many locks at one time.      Also, if we are doing VACUUM ANALYZE, the
214          * ANALYZE part runs as a separate transaction from the VACUUM to
215          * further reduce locking.
216          */
217         foreach(cur, vrl)
218         {
219                 Oid             relid = (Oid) lfirsti(cur);
220
221                 if (vacstmt->vacuum)
222                         vacuum_rel(relid, vacstmt, RELKIND_RELATION);
223                 if (vacstmt->analyze)
224                         analyze_rel(relid, vacstmt);
225         }
226
227         /* clean up */
228         vacuum_shutdown(vacstmt);
229 }
230
231 /*
232  *      vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
233  *
234  *              Formerly, there was code here to prevent more than one VACUUM from
235  *              executing concurrently in the same database.  However, there's no
236  *              good reason to prevent that, and manually removing lockfiles after
237  *              a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
238  *              and just rely on the locks we grab on each target table
239  *              to ensure that there aren't two VACUUMs running on the same table
240  *              at the same time.
241  *
242  *              The strangeness with committing and starting transactions in the
243  *              init and shutdown routines is due to the fact that the vacuum cleaner
244  *              is invoked via an SQL command, and so is already executing inside
245  *              a transaction.  We need to leave ourselves in a predictable state
246  *              on entry and exit to the vacuum cleaner.  We commit the transaction
247  *              started in PostgresMain() inside vacuum_init(), and start one in
248  *              vacuum_shutdown() to match the commit waiting for us back in
249  *              PostgresMain().
250  */
251 static void
252 vacuum_init(VacuumStmt *vacstmt)
253 {
254         if (vacstmt->vacuum && vacstmt->relation == NULL)
255         {
256                 /*
257                  * Compute the initially applicable OldestXmin and FreezeLimit
258                  * XIDs, so that we can record these values at the end of the
259                  * VACUUM. Note that individual tables may well be processed with
260                  * newer values, but we can guarantee that no (non-shared)
261                  * relations are processed with older ones.
262                  *
263                  * It is okay to record non-shared values in pg_database, even though
264                  * we may vacuum shared relations with older cutoffs, because only
265                  * the minimum of the values present in pg_database matters.  We
266                  * can be sure that shared relations have at some time been
267                  * vacuumed with cutoffs no worse than the global minimum; for, if
268                  * there is a backend in some other DB with xmin = OLDXMIN that's
269                  * determining the cutoff with which we vacuum shared relations,
270                  * it is not possible for that database to have a cutoff newer
271                  * than OLDXMIN recorded in pg_database.
272                  */
273                 vacuum_set_xid_limits(vacstmt, false,
274                                                           &initialOldestXmin, &initialFreezeLimit);
275         }
276
277         /* matches the StartTransaction in PostgresMain() */
278         CommitTransactionCommand();
279 }
280
281 static void
282 vacuum_shutdown(VacuumStmt *vacstmt)
283 {
284         /* on entry, we are not in a transaction */
285
286         /* matches the CommitTransaction in PostgresMain() */
287         StartTransactionCommand();
288
289         /*
290          * If we did a database-wide VACUUM, update the database's pg_database
291          * row with info about the transaction IDs used, and try to truncate
292          * pg_clog.
293          */
294         if (vacstmt->vacuum && vacstmt->relation == NULL)
295         {
296                 vac_update_dbstats(MyDatabaseId,
297                                                    initialOldestXmin, initialFreezeLimit);
298                 vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
299         }
300
301         /*
302          * Clean up working storage --- note we must do this after
303          * StartTransactionCommand, else we might be trying to delete the
304          * active context!
305          */
306         MemoryContextDelete(vac_context);
307         vac_context = NULL;
308 }
309
310 /*
311  * Build a list of Oids for each relation to be processed
312  *
313  * The list is built in vac_context so that it will survive across our
314  * per-relation transactions.
315  */
316 static List *
317 getrels(const RangeVar *vacrel, const char *stmttype)
318 {
319         List       *vrl = NIL;
320         MemoryContext oldcontext;
321
322         if (vacrel)
323         {
324                 /* Process specific relation */
325                 Oid             relid;
326
327                 relid = RangeVarGetRelid(vacrel, false);
328
329                 /* Make a relation list entry for this guy */
330                 oldcontext = MemoryContextSwitchTo(vac_context);
331                 vrl = lappendi(vrl, relid);
332                 MemoryContextSwitchTo(oldcontext);
333         }
334         else
335         {
336                 /* Process all plain relations listed in pg_class */
337                 Relation        pgclass;
338                 HeapScanDesc scan;
339                 HeapTuple       tuple;
340                 ScanKeyData key;
341
342                 ScanKeyEntryInitialize(&key, 0x0,
343                                                            Anum_pg_class_relkind,
344                                                            F_CHAREQ,
345                                                            CharGetDatum(RELKIND_RELATION));
346
347                 pgclass = heap_openr(RelationRelationName, AccessShareLock);
348
349                 scan = heap_beginscan(pgclass, false, SnapshotNow, 1, &key);
350
351                 while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
352                 {
353                         /* Make a relation list entry for this guy */
354                         oldcontext = MemoryContextSwitchTo(vac_context);
355                         vrl = lappendi(vrl, tuple->t_data->t_oid);
356                         MemoryContextSwitchTo(oldcontext);
357                 }
358
359                 heap_endscan(scan);
360                 heap_close(pgclass, AccessShareLock);
361         }
362
363         return vrl;
364 }
365
366 /*
367  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
368  */
369 void
370 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
371                                           TransactionId *oldestXmin,
372                                           TransactionId *freezeLimit)
373 {
374         TransactionId limit;
375
376         *oldestXmin = GetOldestXmin(sharedRel);
377
378         Assert(TransactionIdIsNormal(*oldestXmin));
379
380         if (vacstmt->freeze)
381         {
382                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
383                 limit = *oldestXmin;
384         }
385         else
386         {
387                 /*
388                  * Normal case: freeze cutoff is well in the past, to wit, about
389                  * halfway to the wrap horizon
390                  */
391                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
392         }
393
394         /*
395          * Be careful not to generate a "permanent" XID
396          */
397         if (!TransactionIdIsNormal(limit))
398                 limit = FirstNormalTransactionId;
399
400         /*
401          * Ensure sane relationship of limits
402          */
403         if (TransactionIdFollows(limit, *oldestXmin))
404         {
405                 elog(WARNING, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
406                 limit = *oldestXmin;
407         }
408
409         *freezeLimit = limit;
410 }
411
412
413 /*
414  *      vac_update_relstats() -- update statistics for one relation
415  *
416  *              Update the whole-relation statistics that are kept in its pg_class
417  *              row.  There are additional stats that will be updated if we are
418  *              doing ANALYZE, but we always update these stats.  This routine works
419  *              for both index and heap relation entries in pg_class.
420  *
421  *              We violate no-overwrite semantics here by storing new values for the
422  *              statistics columns directly into the pg_class tuple that's already on
423  *              the page.  The reason for this is that if we updated these tuples in
424  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
425  *              by the time we got done with a vacuum cycle, most of the tuples in
426  *              pg_class would've been obsoleted.  Of course, this only works for
427  *              fixed-size never-null columns, but these are.
428  *
429  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
430  *              ANALYZE.
431  */
432 void
433 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
434                                         bool hasindex)
435 {
436         Relation        rd;
437         HeapTupleData rtup;
438         HeapTuple       ctup;
439         Form_pg_class pgcform;
440         Buffer          buffer;
441
442         /*
443          * update number of tuples and number of pages in pg_class
444          */
445         rd = heap_openr(RelationRelationName, RowExclusiveLock);
446
447         ctup = SearchSysCache(RELOID,
448                                                   ObjectIdGetDatum(relid),
449                                                   0, 0, 0);
450         if (!HeapTupleIsValid(ctup))
451                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
452                          relid);
453
454         /* get the buffer cache tuple */
455         rtup.t_self = ctup->t_self;
456         ReleaseSysCache(ctup);
457         heap_fetch(rd, SnapshotNow, &rtup, &buffer, NULL);
458
459         /* overwrite the existing statistics in the tuple */
460         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
461         pgcform->relpages = (int32) num_pages;
462         pgcform->reltuples = num_tuples;
463         pgcform->relhasindex = hasindex;
464
465         /*
466          * If we have discovered that there are no indexes, then there's no
467          * primary key either.  This could be done more thoroughly...
468          */
469         if (!hasindex)
470                 pgcform->relhaspkey = false;
471
472         /*
473          * Invalidate the tuple in the catcaches; this also arranges to flush
474          * the relation's relcache entry.  (If we fail to commit for some reason,
475          * no flush will occur, but no great harm is done since there are no
476          * noncritical state updates here.)
477          */
478         CacheInvalidateHeapTuple(rd, &rtup);
479
480         /* Write the buffer */
481         WriteBuffer(buffer);
482
483         heap_close(rd, RowExclusiveLock);
484 }
485
486
487 /*
488  *      vac_update_dbstats() -- update statistics for one database
489  *
490  *              Update the whole-database statistics that are kept in its pg_database
491  *              row.
492  *
493  *              We violate no-overwrite semantics here by storing new values for the
494  *              statistics columns directly into the tuple that's already on the page.
495  *              As with vac_update_relstats, this avoids leaving dead tuples behind
496  *              after a VACUUM; which is good since GetRawDatabaseInfo
497  *              can get confused by finding dead tuples in pg_database.
498  *
499  *              This routine is shared by full and lazy VACUUM.  Note that it is only
500  *              applied after a database-wide VACUUM operation.
501  */
502 static void
503 vac_update_dbstats(Oid dbid,
504                                    TransactionId vacuumXID,
505                                    TransactionId frozenXID)
506 {
507         Relation        relation;
508         ScanKeyData entry[1];
509         HeapScanDesc scan;
510         HeapTuple       tuple;
511         Form_pg_database dbform;
512
513         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
514
515         /* Must use a heap scan, since there's no syscache for pg_database */
516         ScanKeyEntryInitialize(&entry[0], 0x0,
517                                                    ObjectIdAttributeNumber, F_OIDEQ,
518                                                    ObjectIdGetDatum(dbid));
519
520         scan = heap_beginscan(relation, 0, SnapshotNow, 1, entry);
521
522         tuple = heap_getnext(scan, 0);
523
524         if (!HeapTupleIsValid(tuple))
525                 elog(ERROR, "database %u does not exist", dbid);
526
527         dbform = (Form_pg_database) GETSTRUCT(tuple);
528
529         /* overwrite the existing statistics in the tuple */
530         dbform->datvacuumxid = vacuumXID;
531         dbform->datfrozenxid = frozenXID;
532
533         /* invalidate the tuple in the cache and write the buffer */
534         CacheInvalidateHeapTuple(relation, tuple);
535         WriteNoReleaseBuffer(scan->rs_cbuf);
536
537         heap_endscan(scan);
538
539         heap_close(relation, RowExclusiveLock);
540 }
541
542
543 /*
544  *      vac_truncate_clog() -- attempt to truncate the commit log
545  *
546  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
547  *              and use it to truncate the transaction commit log (pg_clog).
548  *              Also generate a warning if the system-wide oldest datfrozenxid
549  *              seems to be in danger of wrapping around.
550  *
551  *              The passed XIDs are simply the ones I just wrote into my pg_database
552  *              entry.  They're used to initialize the "min" calculations.
553  *
554  *              This routine is shared by full and lazy VACUUM.  Note that it is only
555  *              applied after a database-wide VACUUM operation.
556  */
557 static void
558 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
559 {
560         Relation        relation;
561         HeapScanDesc scan;
562         HeapTuple       tuple;
563         int32           age;
564
565         relation = heap_openr(DatabaseRelationName, AccessShareLock);
566
567         scan = heap_beginscan(relation, 0, SnapshotNow, 0, NULL);
568
569         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
570         {
571                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
572
573                 /* Ignore non-connectable databases (eg, template0) */
574                 /* It's assumed that these have been frozen correctly */
575                 if (!dbform->datallowconn)
576                         continue;
577
578                 if (TransactionIdIsNormal(dbform->datvacuumxid) &&
579                         TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
580                         vacuumXID = dbform->datvacuumxid;
581                 if (TransactionIdIsNormal(dbform->datfrozenxid) &&
582                         TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
583                         frozenXID = dbform->datfrozenxid;
584         }
585
586         heap_endscan(scan);
587
588         heap_close(relation, AccessShareLock);
589
590         /* Truncate CLOG to the oldest vacuumxid */
591         TruncateCLOG(vacuumXID);
592
593         /* Give warning about impending wraparound problems */
594         age = (int32) (GetCurrentTransactionId() - frozenXID);
595         if (age > (int32) ((MaxTransactionId >> 3) * 3))
596                 elog(WARNING, "Some databases have not been vacuumed in %d transactions."
597                          "\n\tBetter vacuum them within %d transactions,"
598                          "\n\tor you may have a wraparound failure.",
599                          age, (int32) (MaxTransactionId >> 1) - age);
600 }
601
602
603 /****************************************************************************
604  *                                                                                                                                                      *
605  *                      Code common to both flavors of VACUUM                                                   *
606  *                                                                                                                                                      *
607  ****************************************************************************
608  */
609
610
611 /*
612  *      vacuum_rel() -- vacuum one heap relation
613  *
614  *              Doing one heap at a time incurs extra overhead, since we need to
615  *              check that the heap exists again just before we vacuum it.      The
616  *              reason that we do this is so that vacuuming can be spread across
617  *              many small transactions.  Otherwise, two-phase locking would require
618  *              us to lock the entire database during one pass of the vacuum cleaner.
619  *
620  *              At entry and exit, we are not inside a transaction.
621  */
622 static void
623 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
624 {
625         LOCKMODE        lmode;
626         Relation        onerel;
627         LockRelId       onerelid;
628         Oid                     toast_relid;
629
630         /* Begin a transaction for vacuuming this relation */
631         StartTransactionCommand();
632
633         /*
634          * Check for user-requested abort.      Note we want this to be inside a
635          * transaction, so xact.c doesn't issue useless WARNING.
636          */
637         CHECK_FOR_INTERRUPTS();
638
639         /*
640          * Race condition -- if the pg_class tuple has gone away since the
641          * last time we saw it, we don't need to vacuum it.
642          */
643         if (!SearchSysCacheExists(RELOID,
644                                                           ObjectIdGetDatum(relid),
645                                                           0, 0, 0))
646         {
647                 CommitTransactionCommand();
648                 return;
649         }
650
651         /*
652          * Determine the type of lock we want --- hard exclusive lock for a
653          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
654          * vacuum.      Either way, we can be sure that no other backend is
655          * vacuuming the same table.
656          */
657         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
658
659         /*
660          * Open the class, get an appropriate lock on it, and check
661          * permissions.
662          *
663          * We allow the user to vacuum a table if he is superuser, the table
664          * owner, or the database owner (but in the latter case, only if it's
665          * not a shared relation).      pg_class_ownercheck includes the superuser case.
666          *
667          * Note we choose to treat permissions failure as a WARNING and keep
668          * trying to vacuum the rest of the DB --- is this appropriate?
669          */
670         onerel = relation_open(relid, lmode);
671
672         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
673                   (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
674         {
675                 elog(WARNING, "Skipping \"%s\" --- only table or database owner can VACUUM it",
676                          RelationGetRelationName(onerel));
677                 relation_close(onerel, lmode);
678                 CommitTransactionCommand();
679                 return;
680         }
681
682         /*
683          * Check that it's a plain table; we used to do this in getrels() but
684          * seems safer to check after we've locked the relation.
685          */
686         if (onerel->rd_rel->relkind != expected_relkind)
687         {
688                 elog(WARNING, "Skipping \"%s\" --- can not process indexes, views or special system tables",
689                          RelationGetRelationName(onerel));
690                 relation_close(onerel, lmode);
691                 CommitTransactionCommand();
692                 return;
693         }
694
695         /*
696          * Get a session-level lock too. This will protect our access to the
697          * relation across multiple transactions, so that we can vacuum the
698          * relation's TOAST table (if any) secure in the knowledge that no one
699          * is deleting the parent relation.
700          *
701          * NOTE: this cannot block, even if someone else is waiting for access,
702          * because the lock manager knows that both lock requests are from the
703          * same process.
704          */
705         onerelid = onerel->rd_lockInfo.lockRelId;
706         LockRelationForSession(&onerelid, lmode);
707
708         /*
709          * Remember the relation's TOAST relation for later
710          */
711         toast_relid = onerel->rd_rel->reltoastrelid;
712
713         /*
714          * Do the actual work --- either FULL or "lazy" vacuum
715          */
716         if (vacstmt->full)
717                 full_vacuum_rel(onerel, vacstmt);
718         else
719                 lazy_vacuum_rel(onerel, vacstmt);
720
721         /* all done with this class, but hold lock until commit */
722         relation_close(onerel, NoLock);
723
724         /*
725          * Complete the transaction and free all temporary memory used.
726          */
727         CommitTransactionCommand();
728
729         /*
730          * If the relation has a secondary toast rel, vacuum that too while we
731          * still hold the session lock on the master table.  Note however that
732          * "analyze" will not get done on the toast table.      This is good,
733          * because the toaster always uses hardcoded index access and
734          * statistics are totally unimportant for toast relations.
735          */
736         if (toast_relid != InvalidOid)
737                 vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE);
738
739         /*
740          * Now release the session-level lock on the master table.
741          */
742         UnlockRelationForSession(&onerelid, lmode);
743 }
744
745
746 /****************************************************************************
747  *                                                                                                                                                      *
748  *                      Code for VACUUM FULL (only)                                                                             *
749  *                                                                                                                                                      *
750  ****************************************************************************
751  */
752
753
754 /*
755  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
756  *
757  *              This routine vacuums a single heap, cleans out its indexes, and
758  *              updates its num_pages and num_tuples statistics.
759  *
760  *              At entry, we have already established a transaction and opened
761  *              and locked the relation.
762  */
763 static void
764 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
765 {
766         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
767                                                                                  * clean indexes */
768         VacPageListData fraged_pages;           /* List of pages with space enough
769                                                                                  * for re-using */
770         Relation   *Irel;
771         int                     nindexes,
772                                 i;
773         VRelStats  *vacrelstats;
774         bool            reindex = false;
775
776         if (IsIgnoringSystemIndexes() &&
777                 IsSystemRelationName(RelationGetRelationName(onerel)))
778                 reindex = true;
779
780         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
781                                                   &OldestXmin, &FreezeLimit);
782
783         /*
784          * Set up statistics-gathering machinery.
785          */
786         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
787         vacrelstats->rel_pages = 0;
788         vacrelstats->rel_tuples = 0;
789         vacrelstats->hasindex = false;
790
791         /* scan the heap */
792         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
793         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
794
795         /* Now open all indexes of the relation */
796         vac_open_indexes(onerel, &nindexes, &Irel);
797         if (!Irel)
798                 reindex = false;
799         else if (!RelationGetForm(onerel)->relhasindex)
800                 reindex = true;
801         if (nindexes > 0)
802                 vacrelstats->hasindex = true;
803
804 #ifdef NOT_USED
805
806         /*
807          * reindex in VACUUM is dangerous under WAL. ifdef out until it
808          * becomes safe.
809          */
810         if (reindex)
811         {
812                 vac_close_indexes(nindexes, Irel);
813                 Irel = (Relation *) NULL;
814                 activate_indexes_of_a_table(RelationGetRelid(onerel), false);
815         }
816 #endif   /* NOT_USED */
817
818         /* Clean/scan index relation(s) */
819         if (Irel != (Relation *) NULL)
820         {
821                 if (vacuum_pages.num_pages > 0)
822                 {
823                         for (i = 0; i < nindexes; i++)
824                                 vacuum_index(&vacuum_pages, Irel[i],
825                                                          vacrelstats->rel_tuples, 0);
826                 }
827                 else
828                 {
829                         /* just scan indexes to update statistic */
830                         for (i = 0; i < nindexes; i++)
831                                 scan_index(Irel[i], vacrelstats->rel_tuples);
832                 }
833         }
834
835         if (fraged_pages.num_pages > 0)
836         {
837                 /* Try to shrink heap */
838                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
839                                         nindexes, Irel);
840                 vac_close_indexes(nindexes, Irel);
841         }
842         else
843         {
844                 vac_close_indexes(nindexes, Irel);
845                 if (vacuum_pages.num_pages > 0)
846                 {
847                         /* Clean pages from vacuum_pages list */
848                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
849                 }
850                 else
851                 {
852                         /*
853                          * Flush dirty pages out to disk.  We must do this even if we
854                          * didn't do anything else, because we want to ensure that all
855                          * tuples have correct on-row commit status on disk (see
856                          * bufmgr.c's comments for FlushRelationBuffers()).
857                          */
858                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
859                         if (i < 0)
860                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
861                                          i);
862                 }
863         }
864
865 #ifdef NOT_USED
866         if (reindex)
867                 activate_indexes_of_a_table(RelationGetRelid(onerel), true);
868 #endif   /* NOT_USED */
869
870         /* update shared free space map with final free space info */
871         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
872
873         /* update statistics in pg_class */
874         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
875                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
876 }
877
878
879 /*
880  *      scan_heap() -- scan an open heap relation
881  *
882  *              This routine sets commit status bits, constructs vacuum_pages (list
883  *              of pages we need to compact free space on and/or clean indexes of
884  *              deleted tuples), constructs fraged_pages (list of pages with free
885  *              space that tuples could be moved into), and calculates statistics
886  *              on the number of live tuples in the heap.
887  */
888 static void
889 scan_heap(VRelStats *vacrelstats, Relation onerel,
890                   VacPageList vacuum_pages, VacPageList fraged_pages)
891 {
892         BlockNumber nblocks,
893                                 blkno;
894         ItemId          itemid;
895         Buffer          buf;
896         HeapTupleData tuple;
897         OffsetNumber offnum,
898                                 maxoff;
899         bool            pgchanged,
900                                 tupgone,
901                                 notup;
902         char       *relname;
903         VacPage         vacpage,
904                                 vacpagecopy;
905         BlockNumber empty_pages,
906                                 new_pages,
907                                 changed_pages,
908                                 empty_end_pages;
909         double          num_tuples,
910                                 tups_vacuumed,
911                                 nkeep,
912                                 nunused;
913         double          free_size,
914                                 usable_free_size;
915         Size            min_tlen = MaxTupleSize;
916         Size            max_tlen = 0;
917         int                     i;
918         bool            do_shrinking = true;
919         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
920         int                     num_vtlinks = 0;
921         int                     free_vtlinks = 100;
922         VacRUsage       ru0;
923
924         vac_init_rusage(&ru0);
925
926         relname = RelationGetRelationName(onerel);
927         elog(elevel, "--Relation %s.%s--",
928                  get_namespace_name(RelationGetNamespace(onerel)),
929                  relname);
930
931         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
932         num_tuples = tups_vacuumed = nkeep = nunused = 0;
933         free_size = 0;
934
935         nblocks = RelationGetNumberOfBlocks(onerel);
936
937         /*
938          * We initially create each VacPage item in a maximal-sized workspace,
939          * then copy the workspace into a just-large-enough copy.
940          */
941         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
942
943         for (blkno = 0; blkno < nblocks; blkno++)
944         {
945                 Page            page,
946                                         tempPage = NULL;
947                 bool            do_reap,
948                                         do_frag;
949
950                 CHECK_FOR_INTERRUPTS();
951
952                 buf = ReadBuffer(onerel, blkno);
953                 page = BufferGetPage(buf);
954
955                 vacpage->blkno = blkno;
956                 vacpage->offsets_used = 0;
957                 vacpage->offsets_free = 0;
958
959                 if (PageIsNew(page))
960                 {
961                         elog(WARNING, "Rel %s: Uninitialized page %u - fixing",
962                                  relname, blkno);
963                         PageInit(page, BufferGetPageSize(buf), 0);
964                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
965                         free_size += (vacpage->free - sizeof(ItemIdData));
966                         new_pages++;
967                         empty_end_pages++;
968                         vacpagecopy = copy_vac_page(vacpage);
969                         vpage_insert(vacuum_pages, vacpagecopy);
970                         vpage_insert(fraged_pages, vacpagecopy);
971                         WriteBuffer(buf);
972                         continue;
973                 }
974
975                 if (PageIsEmpty(page))
976                 {
977                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
978                         free_size += (vacpage->free - sizeof(ItemIdData));
979                         empty_pages++;
980                         empty_end_pages++;
981                         vacpagecopy = copy_vac_page(vacpage);
982                         vpage_insert(vacuum_pages, vacpagecopy);
983                         vpage_insert(fraged_pages, vacpagecopy);
984                         ReleaseBuffer(buf);
985                         continue;
986                 }
987
988                 pgchanged = false;
989                 notup = true;
990                 maxoff = PageGetMaxOffsetNumber(page);
991                 for (offnum = FirstOffsetNumber;
992                          offnum <= maxoff;
993                          offnum = OffsetNumberNext(offnum))
994                 {
995                         uint16          sv_infomask;
996
997                         itemid = PageGetItemId(page, offnum);
998
999                         /*
1000                          * Collect un-used items too - it's possible to have indexes
1001                          * pointing here after crash.
1002                          */
1003                         if (!ItemIdIsUsed(itemid))
1004                         {
1005                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1006                                 nunused += 1;
1007                                 continue;
1008                         }
1009
1010                         tuple.t_datamcxt = NULL;
1011                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1012                         tuple.t_len = ItemIdGetLength(itemid);
1013                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1014
1015                         tupgone = false;
1016                         sv_infomask = tuple.t_data->t_infomask;
1017
1018                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1019                         {
1020                                 case HEAPTUPLE_DEAD:
1021                                         tupgone = true;         /* we can delete the tuple */
1022                                         break;
1023                                 case HEAPTUPLE_LIVE:
1024
1025                                         /*
1026                                          * Tuple is good.  Consider whether to replace its
1027                                          * xmin value with FrozenTransactionId.
1028                                          */
1029                                         if (TransactionIdIsNormal(tuple.t_data->t_xmin) &&
1030                                                 TransactionIdPrecedes(tuple.t_data->t_xmin,
1031                                                                                           FreezeLimit))
1032                                         {
1033                                                 tuple.t_data->t_xmin = FrozenTransactionId;
1034                                                 /* infomask should be okay already */
1035                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1036                                                 pgchanged = true;
1037                                         }
1038                                         break;
1039                                 case HEAPTUPLE_RECENTLY_DEAD:
1040
1041                                         /*
1042                                          * If tuple is recently deleted then we must not
1043                                          * remove it from relation.
1044                                          */
1045                                         nkeep += 1;
1046
1047                                         /*
1048                                          * If we do shrinking and this tuple is updated one
1049                                          * then remember it to construct updated tuple
1050                                          * dependencies.
1051                                          */
1052                                         if (do_shrinking &&
1053                                                 !(ItemPointerEquals(&(tuple.t_self),
1054                                                                                         &(tuple.t_data->t_ctid))))
1055                                         {
1056                                                 if (free_vtlinks == 0)
1057                                                 {
1058                                                         free_vtlinks = 1000;
1059                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1060                                                                                    (free_vtlinks + num_vtlinks) *
1061                                                                                                  sizeof(VTupleLinkData));
1062                                                 }
1063                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1064                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1065                                                 free_vtlinks--;
1066                                                 num_vtlinks++;
1067                                         }
1068                                         break;
1069                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1070
1071                                         /*
1072                                          * This should not happen, since we hold exclusive
1073                                          * lock on the relation; shouldn't we raise an error?
1074                                          */
1075                                         elog(WARNING, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
1076                                                  relname, blkno, offnum, tuple.t_data->t_xmin);
1077                                         do_shrinking = false;
1078                                         break;
1079                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1080
1081                                         /*
1082                                          * This should not happen, since we hold exclusive
1083                                          * lock on the relation; shouldn't we raise an error?
1084                                          */
1085                                         elog(WARNING, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
1086                                                  relname, blkno, offnum, tuple.t_data->t_xmax);
1087                                         do_shrinking = false;
1088                                         break;
1089                                 default:
1090                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
1091                                         break;
1092                         }
1093
1094                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1095                         if (sv_infomask != tuple.t_data->t_infomask)
1096                                 pgchanged = true;
1097
1098                         /*
1099                          * Other checks...
1100                          */
1101                         if (!OidIsValid(tuple.t_data->t_oid) &&
1102                                 onerel->rd_rel->relhasoids)
1103                                 elog(WARNING, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
1104                                          relname, blkno, offnum, (int) tupgone);
1105
1106                         if (tupgone)
1107                         {
1108                                 ItemId          lpp;
1109
1110                                 /*
1111                                  * Here we are building a temporary copy of the page with
1112                                  * dead tuples removed.  Below we will apply
1113                                  * PageRepairFragmentation to the copy, so that we can
1114                                  * determine how much space will be available after
1115                                  * removal of dead tuples.      But note we are NOT changing
1116                                  * the real page yet...
1117                                  */
1118                                 if (tempPage == (Page) NULL)
1119                                 {
1120                                         Size            pageSize;
1121
1122                                         pageSize = PageGetPageSize(page);
1123                                         tempPage = (Page) palloc(pageSize);
1124                                         memcpy(tempPage, page, pageSize);
1125                                 }
1126
1127                                 /* mark it unused on the temp page */
1128                                 lpp = PageGetItemId(tempPage, offnum);
1129                                 lpp->lp_flags &= ~LP_USED;
1130
1131                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1132                                 tups_vacuumed += 1;
1133                         }
1134                         else
1135                         {
1136                                 num_tuples += 1;
1137                                 notup = false;
1138                                 if (tuple.t_len < min_tlen)
1139                                         min_tlen = tuple.t_len;
1140                                 if (tuple.t_len > max_tlen)
1141                                         max_tlen = tuple.t_len;
1142                         }
1143                 }                                               /* scan along page */
1144
1145                 if (tempPage != (Page) NULL)
1146                 {
1147                         /* Some tuples are removable; figure free space after removal */
1148                         PageRepairFragmentation(tempPage, NULL);
1149                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1150                         pfree(tempPage);
1151                         do_reap = true;
1152                 }
1153                 else
1154                 {
1155                         /* Just use current available space */
1156                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1157                         /* Need to reap the page if it has ~LP_USED line pointers */
1158                         do_reap = (vacpage->offsets_free > 0);
1159                 }
1160
1161                 free_size += vacpage->free;
1162
1163                 /*
1164                  * Add the page to fraged_pages if it has a useful amount of free
1165                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1166                  * don't know that accurately near the start of the relation, so
1167                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1168                  */
1169                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1170
1171                 if (do_reap || do_frag)
1172                 {
1173                         vacpagecopy = copy_vac_page(vacpage);
1174                         if (do_reap)
1175                                 vpage_insert(vacuum_pages, vacpagecopy);
1176                         if (do_frag)
1177                                 vpage_insert(fraged_pages, vacpagecopy);
1178                 }
1179
1180                 if (notup)
1181                         empty_end_pages++;
1182                 else
1183                         empty_end_pages = 0;
1184
1185                 if (pgchanged)
1186                 {
1187                         WriteBuffer(buf);
1188                         changed_pages++;
1189                 }
1190                 else
1191                         ReleaseBuffer(buf);
1192         }
1193
1194         pfree(vacpage);
1195
1196         /* save stats in the rel list for use later */
1197         vacrelstats->rel_tuples = num_tuples;
1198         vacrelstats->rel_pages = nblocks;
1199         if (num_tuples == 0)
1200                 min_tlen = max_tlen = 0;
1201         vacrelstats->min_tlen = min_tlen;
1202         vacrelstats->max_tlen = max_tlen;
1203
1204         vacuum_pages->empty_end_pages = empty_end_pages;
1205         fraged_pages->empty_end_pages = empty_end_pages;
1206
1207         /*
1208          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1209          * remove any "empty" end-pages from the list, and compute usable free
1210          * space = free space in remaining pages.
1211          */
1212         if (do_shrinking)
1213         {
1214                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1215                 fraged_pages->num_pages -= empty_end_pages;
1216                 usable_free_size = 0;
1217                 for (i = 0; i < fraged_pages->num_pages; i++)
1218                         usable_free_size += fraged_pages->pagedesc[i]->free;
1219         }
1220         else
1221         {
1222                 fraged_pages->num_pages = 0;
1223                 usable_free_size = 0;
1224         }
1225
1226         if (usable_free_size > 0 && num_vtlinks > 0)
1227         {
1228                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1229                           vac_cmp_vtlinks);
1230                 vacrelstats->vtlinks = vtlinks;
1231                 vacrelstats->num_vtlinks = num_vtlinks;
1232         }
1233         else
1234         {
1235                 vacrelstats->vtlinks = NULL;
1236                 vacrelstats->num_vtlinks = 0;
1237                 pfree(vtlinks);
1238         }
1239
1240         elog(elevel, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
1241 Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
1242 Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u.\n\t%s",
1243                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1244                  new_pages, num_tuples, tups_vacuumed,
1245                  nkeep, vacrelstats->num_vtlinks,
1246                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1247                  free_size, usable_free_size,
1248                  empty_end_pages, fraged_pages->num_pages,
1249                  vac_show_rusage(&ru0));
1250
1251 }
1252
1253
1254 /*
1255  *      repair_frag() -- try to repair relation's fragmentation
1256  *
1257  *              This routine marks dead tuples as unused and tries re-use dead space
1258  *              by moving tuples (and inserting indexes if needed). It constructs
1259  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1260  *              for them after committing (in hack-manner - without losing locks
1261  *              and freeing memory!) current transaction. It truncates relation
1262  *              if some end-blocks are gone away.
1263  */
1264 static void
1265 repair_frag(VRelStats *vacrelstats, Relation onerel,
1266                         VacPageList vacuum_pages, VacPageList fraged_pages,
1267                         int nindexes, Relation *Irel)
1268 {
1269         TransactionId myXID;
1270         CommandId       myCID;
1271         Buffer          buf,
1272                                 cur_buffer;
1273         BlockNumber nblocks,
1274                                 blkno;
1275         BlockNumber last_move_dest_block = 0,
1276                                 last_vacuum_block;
1277         Page            page,
1278                                 ToPage = NULL;
1279         OffsetNumber offnum,
1280                                 maxoff,
1281                                 newoff,
1282                                 max_offset;
1283         ItemId          itemid,
1284                                 newitemid;
1285         HeapTupleData tuple,
1286                                 newtup;
1287         TupleDesc       tupdesc;
1288         ResultRelInfo *resultRelInfo;
1289         EState     *estate;
1290         TupleTable      tupleTable;
1291         TupleTableSlot *slot;
1292         VacPageListData Nvacpagelist;
1293         VacPage         cur_page = NULL,
1294                                 last_vacuum_page,
1295                                 vacpage,
1296                            *curpage;
1297         int                     cur_item = 0;
1298         int                     i;
1299         Size            tuple_len;
1300         int                     num_moved,
1301                                 num_fraged_pages,
1302                                 vacuumed_pages;
1303         int                     checked_moved,
1304                                 num_tuples,
1305                                 keep_tuples = 0;
1306         bool            isempty,
1307                                 dowrite,
1308                                 chain_tuple_moved;
1309         VacRUsage       ru0;
1310
1311         vac_init_rusage(&ru0);
1312
1313         myXID = GetCurrentTransactionId();
1314         myCID = GetCurrentCommandId();
1315
1316         tupdesc = RelationGetDescr(onerel);
1317
1318         /*
1319          * We need a ResultRelInfo and an EState so we can use the regular
1320          * executor's index-entry-making machinery.
1321          */
1322         resultRelInfo = makeNode(ResultRelInfo);
1323         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1324         resultRelInfo->ri_RelationDesc = onerel;
1325         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1326
1327         ExecOpenIndices(resultRelInfo);
1328
1329         estate = CreateExecutorState();
1330         estate->es_result_relations = resultRelInfo;
1331         estate->es_num_result_relations = 1;
1332         estate->es_result_relation_info = resultRelInfo;
1333
1334         /* Set up a dummy tuple table too */
1335         tupleTable = ExecCreateTupleTable(1);
1336         slot = ExecAllocTableSlot(tupleTable);
1337         ExecSetSlotDescriptor(slot, tupdesc, false);
1338
1339         Nvacpagelist.num_pages = 0;
1340         num_fraged_pages = fraged_pages->num_pages;
1341         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1342         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1343         if (vacuumed_pages > 0)
1344         {
1345                 /* get last reaped page from vacuum_pages */
1346                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1347                 last_vacuum_block = last_vacuum_page->blkno;
1348         }
1349         else
1350         {
1351                 last_vacuum_page = NULL;
1352                 last_vacuum_block = InvalidBlockNumber;
1353         }
1354         cur_buffer = InvalidBuffer;
1355         num_moved = 0;
1356
1357         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1358         vacpage->offsets_used = vacpage->offsets_free = 0;
1359
1360         /*
1361          * Scan pages backwards from the last nonempty page, trying to move
1362          * tuples down to lower pages.  Quit when we reach a page that we have
1363          * moved any tuples onto, or the first page if we haven't moved
1364          * anything, or when we find a page we cannot completely empty (this
1365          * last condition is handled by "break" statements within the loop).
1366          *
1367          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1368          * in order by blkno.
1369          */
1370         nblocks = vacrelstats->rel_pages;
1371         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1372                  blkno > last_move_dest_block;
1373                  blkno--)
1374         {
1375                 CHECK_FOR_INTERRUPTS();
1376
1377                 /*
1378                  * Forget fraged_pages pages at or after this one; they're no
1379                  * longer useful as move targets, since we only want to move down.
1380                  * Note that since we stop the outer loop at last_move_dest_block,
1381                  * pages removed here cannot have had anything moved onto them
1382                  * already.
1383                  *
1384                  * Also note that we don't change the stored fraged_pages list, only
1385                  * our local variable num_fraged_pages; so the forgotten pages are
1386                  * still available to be loaded into the free space map later.
1387                  */
1388                 while (num_fraged_pages > 0 &&
1389                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1390                 {
1391                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1392                         --num_fraged_pages;
1393                 }
1394
1395                 /*
1396                  * Process this page of relation.
1397                  */
1398                 buf = ReadBuffer(onerel, blkno);
1399                 page = BufferGetPage(buf);
1400
1401                 vacpage->offsets_free = 0;
1402
1403                 isempty = PageIsEmpty(page);
1404
1405                 dowrite = false;
1406
1407                 /* Is the page in the vacuum_pages list? */
1408                 if (blkno == last_vacuum_block)
1409                 {
1410                         if (last_vacuum_page->offsets_free > 0)
1411                         {
1412                                 /* there are dead tuples on this page - clean them */
1413                                 Assert(!isempty);
1414                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1415                                 vacuum_page(onerel, buf, last_vacuum_page);
1416                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1417                                 dowrite = true;
1418                         }
1419                         else
1420                                 Assert(isempty);
1421                         --vacuumed_pages;
1422                         if (vacuumed_pages > 0)
1423                         {
1424                                 /* get prev reaped page from vacuum_pages */
1425                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1426                                 last_vacuum_block = last_vacuum_page->blkno;
1427                         }
1428                         else
1429                         {
1430                                 last_vacuum_page = NULL;
1431                                 last_vacuum_block = InvalidBlockNumber;
1432                         }
1433                         if (isempty)
1434                         {
1435                                 ReleaseBuffer(buf);
1436                                 continue;
1437                         }
1438                 }
1439                 else
1440                         Assert(!isempty);
1441
1442                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1443                                                                                  * off this page, yet */
1444                 vacpage->blkno = blkno;
1445                 maxoff = PageGetMaxOffsetNumber(page);
1446                 for (offnum = FirstOffsetNumber;
1447                          offnum <= maxoff;
1448                          offnum = OffsetNumberNext(offnum))
1449                 {
1450                         itemid = PageGetItemId(page, offnum);
1451
1452                         if (!ItemIdIsUsed(itemid))
1453                                 continue;
1454
1455                         tuple.t_datamcxt = NULL;
1456                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1457                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1458                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1459
1460                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1461                         {
1462                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1463                                         elog(ERROR, "Invalid XID in t_cmin");
1464                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1465                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1466
1467                                 /*
1468                                  * If this (chain) tuple is moved by me already then I
1469                                  * have to check is it in vacpage or not - i.e. is it
1470                                  * moved while cleaning this page or some previous one.
1471                                  */
1472                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1473                                 {
1474                                         if (keep_tuples == 0)
1475                                                 continue;
1476                                         if (chain_tuple_moved)          /* some chains was moved
1477                                                                                                  * while */
1478                                         {                       /* cleaning this page */
1479                                                 Assert(vacpage->offsets_free > 0);
1480                                                 for (i = 0; i < vacpage->offsets_free; i++)
1481                                                 {
1482                                                         if (vacpage->offsets[i] == offnum)
1483                                                                 break;
1484                                                 }
1485                                                 if (i >= vacpage->offsets_free) /* not found */
1486                                                 {
1487                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1488                                                         keep_tuples--;
1489                                                 }
1490                                         }
1491                                         else
1492                                         {
1493                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1494                                                 keep_tuples--;
1495                                         }
1496                                         continue;
1497                                 }
1498                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1499                         }
1500
1501                         /*
1502                          * If this tuple is in the chain of tuples created in updates
1503                          * by "recent" transactions then we have to move all chain of
1504                          * tuples to another places.
1505                          */
1506                         if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
1507                          !TransactionIdPrecedes(tuple.t_data->t_xmin, OldestXmin)) ||
1508                                 (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1509                                  !(ItemPointerEquals(&(tuple.t_self),
1510                                                                          &(tuple.t_data->t_ctid)))))
1511                         {
1512                                 Buffer          Cbuf = buf;
1513                                 Page            Cpage;
1514                                 ItemId          Citemid;
1515                                 ItemPointerData Ctid;
1516                                 HeapTupleData tp = tuple;
1517                                 Size            tlen = tuple_len;
1518                                 VTupleMove      vtmove = (VTupleMove)
1519                                 palloc(100 * sizeof(VTupleMoveData));
1520                                 int                     num_vtmove = 0;
1521                                 int                     free_vtmove = 100;
1522                                 VacPage         to_vacpage = NULL;
1523                                 int                     to_item = 0;
1524                                 bool            freeCbuf = false;
1525                                 int                     ti;
1526
1527                                 if (vacrelstats->vtlinks == NULL)
1528                                         elog(ERROR, "No one parent tuple was found");
1529                                 if (cur_buffer != InvalidBuffer)
1530                                 {
1531                                         WriteBuffer(cur_buffer);
1532                                         cur_buffer = InvalidBuffer;
1533                                 }
1534
1535                                 /*
1536                                  * If this tuple is in the begin/middle of the chain then
1537                                  * we have to move to the end of chain.
1538                                  */
1539                                 while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1540                                            !(ItemPointerEquals(&(tp.t_self),
1541                                                                                    &(tp.t_data->t_ctid))))
1542                                 {
1543                                         Ctid = tp.t_data->t_ctid;
1544                                         if (freeCbuf)
1545                                                 ReleaseBuffer(Cbuf);
1546                                         freeCbuf = true;
1547                                         Cbuf = ReadBuffer(onerel,
1548                                                                           ItemPointerGetBlockNumber(&Ctid));
1549                                         Cpage = BufferGetPage(Cbuf);
1550                                         Citemid = PageGetItemId(Cpage,
1551                                                                           ItemPointerGetOffsetNumber(&Ctid));
1552                                         if (!ItemIdIsUsed(Citemid))
1553                                         {
1554                                                 /*
1555                                                  * This means that in the middle of chain there
1556                                                  * was tuple updated by older (than OldestXmin)
1557                                                  * xaction and this tuple is already deleted by
1558                                                  * me. Actually, upper part of chain should be
1559                                                  * removed and seems that this should be handled
1560                                                  * in scan_heap(), but it's not implemented at the
1561                                                  * moment and so we just stop shrinking here.
1562                                                  */
1563                                                 ReleaseBuffer(Cbuf);
1564                                                 pfree(vtmove);
1565                                                 vtmove = NULL;
1566                                                 elog(WARNING, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1567                                                 break;
1568                                         }
1569                                         tp.t_datamcxt = NULL;
1570                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1571                                         tp.t_self = Ctid;
1572                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1573                                 }
1574                                 if (vtmove == NULL)
1575                                         break;
1576                                 /* first, can chain be moved ? */
1577                                 for (;;)
1578                                 {
1579                                         if (to_vacpage == NULL ||
1580                                                 !enough_space(to_vacpage, tlen))
1581                                         {
1582                                                 for (i = 0; i < num_fraged_pages; i++)
1583                                                 {
1584                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1585                                                                 break;
1586                                                 }
1587
1588                                                 if (i == num_fraged_pages)
1589                                                 {
1590                                                         /* can't move item anywhere */
1591                                                         for (i = 0; i < num_vtmove; i++)
1592                                                         {
1593                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1594                                                                 (vtmove[i].vacpage->offsets_used)--;
1595                                                         }
1596                                                         num_vtmove = 0;
1597                                                         break;
1598                                                 }
1599                                                 to_item = i;
1600                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1601                                         }
1602                                         to_vacpage->free -= MAXALIGN(tlen);
1603                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1604                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1605                                         (to_vacpage->offsets_used)++;
1606                                         if (free_vtmove == 0)
1607                                         {
1608                                                 free_vtmove = 1000;
1609                                                 vtmove = (VTupleMove) repalloc(vtmove,
1610                                                                                          (free_vtmove + num_vtmove) *
1611                                                                                                  sizeof(VTupleMoveData));
1612                                         }
1613                                         vtmove[num_vtmove].tid = tp.t_self;
1614                                         vtmove[num_vtmove].vacpage = to_vacpage;
1615                                         if (to_vacpage->offsets_used == 1)
1616                                                 vtmove[num_vtmove].cleanVpd = true;
1617                                         else
1618                                                 vtmove[num_vtmove].cleanVpd = false;
1619                                         free_vtmove--;
1620                                         num_vtmove++;
1621
1622                                         /* All done ? */
1623                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1624                                         TransactionIdPrecedes(tp.t_data->t_xmin, OldestXmin))
1625                                                 break;
1626
1627                                         /* Well, try to find tuple with old row version */
1628                                         for (;;)
1629                                         {
1630                                                 Buffer          Pbuf;
1631                                                 Page            Ppage;
1632                                                 ItemId          Pitemid;
1633                                                 HeapTupleData Ptp;
1634                                                 VTupleLinkData vtld,
1635                                                                    *vtlp;
1636
1637                                                 vtld.new_tid = tp.t_self;
1638                                                 vtlp = (VTupleLink)
1639                                                         vac_bsearch((void *) &vtld,
1640                                                                                 (void *) (vacrelstats->vtlinks),
1641                                                                                 vacrelstats->num_vtlinks,
1642                                                                                 sizeof(VTupleLinkData),
1643                                                                                 vac_cmp_vtlinks);
1644                                                 if (vtlp == NULL)
1645                                                         elog(ERROR, "Parent tuple was not found");
1646                                                 tp.t_self = vtlp->this_tid;
1647                                                 Pbuf = ReadBuffer(onerel,
1648                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1649                                                 Ppage = BufferGetPage(Pbuf);
1650                                                 Pitemid = PageGetItemId(Ppage,
1651                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1652                                                 if (!ItemIdIsUsed(Pitemid))
1653                                                         elog(ERROR, "Parent itemid marked as unused");
1654                                                 Ptp.t_datamcxt = NULL;
1655                                                 Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1656                                                 Assert(ItemPointerEquals(&(vtld.new_tid),
1657                                                                                                  &(Ptp.t_data->t_ctid)));
1658
1659                                                 /*
1660                                                  * Read above about cases when
1661                                                  * !ItemIdIsUsed(Citemid) (child item is
1662                                                  * removed)... Due to the fact that at the moment
1663                                                  * we don't remove unuseful part of update-chain,
1664                                                  * it's possible to get too old parent row here.
1665                                                  * Like as in the case which caused this problem,
1666                                                  * we stop shrinking here. I could try to find
1667                                                  * real parent row but want not to do it because
1668                                                  * of real solution will be implemented anyway,
1669                                                  * latter, and we are too close to 6.5 release. -
1670                                                  * vadim 06/11/99
1671                                                  */
1672                                                 if (!(TransactionIdEquals(Ptp.t_data->t_xmax,
1673                                                                                                   tp.t_data->t_xmin)))
1674                                                 {
1675                                                         if (freeCbuf)
1676                                                                 ReleaseBuffer(Cbuf);
1677                                                         freeCbuf = false;
1678                                                         ReleaseBuffer(Pbuf);
1679                                                         for (i = 0; i < num_vtmove; i++)
1680                                                         {
1681                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1682                                                                 (vtmove[i].vacpage->offsets_used)--;
1683                                                         }
1684                                                         num_vtmove = 0;
1685                                                         elog(WARNING, "Too old parent tuple found - can't continue repair_frag");
1686                                                         break;
1687                                                 }
1688 #ifdef NOT_USED                                 /* I'm not sure that this will wotk
1689                                                                  * properly... */
1690
1691                                                 /*
1692                                                  * If this tuple is updated version of row and it
1693                                                  * was created by the same transaction then no one
1694                                                  * is interested in this tuple - mark it as
1695                                                  * removed.
1696                                                  */
1697                                                 if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
1698                                                         TransactionIdEquals(Ptp.t_data->t_xmin,
1699                                                                                                 Ptp.t_data->t_xmax))
1700                                                 {
1701                                                         TransactionIdStore(myXID,
1702                                                                 (TransactionId *) &(Ptp.t_data->t_cmin));
1703                                                         Ptp.t_data->t_infomask &=
1704                                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1705                                                         Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
1706                                                         WriteBuffer(Pbuf);
1707                                                         continue;
1708                                                 }
1709 #endif
1710                                                 tp.t_datamcxt = Ptp.t_datamcxt;
1711                                                 tp.t_data = Ptp.t_data;
1712                                                 tlen = tp.t_len = ItemIdGetLength(Pitemid);
1713                                                 if (freeCbuf)
1714                                                         ReleaseBuffer(Cbuf);
1715                                                 Cbuf = Pbuf;
1716                                                 freeCbuf = true;
1717                                                 break;
1718                                         }
1719                                         if (num_vtmove == 0)
1720                                                 break;
1721                                 }
1722                                 if (freeCbuf)
1723                                         ReleaseBuffer(Cbuf);
1724                                 if (num_vtmove == 0)    /* chain can't be moved */
1725                                 {
1726                                         pfree(vtmove);
1727                                         break;
1728                                 }
1729                                 ItemPointerSetInvalid(&Ctid);
1730                                 for (ti = 0; ti < num_vtmove; ti++)
1731                                 {
1732                                         VacPage         destvacpage = vtmove[ti].vacpage;
1733
1734                                         /* Get page to move from */
1735                                         tuple.t_self = vtmove[ti].tid;
1736                                         Cbuf = ReadBuffer(onerel,
1737                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1738
1739                                         /* Get page to move to */
1740                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1741
1742                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1743                                         if (cur_buffer != Cbuf)
1744                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1745
1746                                         ToPage = BufferGetPage(cur_buffer);
1747                                         Cpage = BufferGetPage(Cbuf);
1748
1749                                         Citemid = PageGetItemId(Cpage,
1750                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1751                                         tuple.t_datamcxt = NULL;
1752                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1753                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1754
1755                                         /*
1756                                          * make a copy of the source tuple, and then mark the
1757                                          * source tuple MOVED_OFF.
1758                                          */
1759                                         heap_copytuple_with_tuple(&tuple, &newtup);
1760
1761                                         /*
1762                                          * register invalidation of source tuple in catcaches.
1763                                          */
1764                                         CacheInvalidateHeapTuple(onerel, &tuple);
1765
1766                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1767                                         START_CRIT_SECTION();
1768
1769                                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1770                                         tuple.t_data->t_infomask &=
1771                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1772                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1773
1774                                         /*
1775                                          * If this page was not used before - clean it.
1776                                          *
1777                                          * NOTE: a nasty bug used to lurk here.  It is possible
1778                                          * for the source and destination pages to be the same
1779                                          * (since this tuple-chain member can be on a page
1780                                          * lower than the one we're currently processing in
1781                                          * the outer loop).  If that's true, then after
1782                                          * vacuum_page() the source tuple will have been
1783                                          * moved, and tuple.t_data will be pointing at
1784                                          * garbage.  Therefore we must do everything that uses
1785                                          * tuple.t_data BEFORE this step!!
1786                                          *
1787                                          * This path is different from the other callers of
1788                                          * vacuum_page, because we have already incremented
1789                                          * the vacpage's offsets_used field to account for the
1790                                          * tuple(s) we expect to move onto the page. Therefore
1791                                          * vacuum_page's check for offsets_used == 0 is wrong.
1792                                          * But since that's a good debugging check for all
1793                                          * other callers, we work around it here rather than
1794                                          * remove it.
1795                                          */
1796                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1797                                         {
1798                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1799
1800                                                 destvacpage->offsets_used = 0;
1801                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1802                                                 destvacpage->offsets_used = sv_offsets_used;
1803                                         }
1804
1805                                         /*
1806                                          * Update the state of the copied tuple, and store it
1807                                          * on the destination page.
1808                                          */
1809                                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1810                                         newtup.t_data->t_infomask &=
1811                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1812                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1813                                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1814                                                                                  InvalidOffsetNumber, LP_USED);
1815                                         if (newoff == InvalidOffsetNumber)
1816                                         {
1817                                                 elog(PANIC, "moving chain: failed to add item with len = %lu to page %u",
1818                                                   (unsigned long) tuple_len, destvacpage->blkno);
1819                                         }
1820                                         newitemid = PageGetItemId(ToPage, newoff);
1821                                         pfree(newtup.t_data);
1822                                         newtup.t_datamcxt = NULL;
1823                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1824                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1825
1826                                         {
1827                                                 XLogRecPtr      recptr =
1828                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1829                                                                           cur_buffer, &newtup);
1830
1831                                                 if (Cbuf != cur_buffer)
1832                                                 {
1833                                                         PageSetLSN(Cpage, recptr);
1834                                                         PageSetSUI(Cpage, ThisStartUpID);
1835                                                 }
1836                                                 PageSetLSN(ToPage, recptr);
1837                                                 PageSetSUI(ToPage, ThisStartUpID);
1838                                         }
1839                                         END_CRIT_SECTION();
1840
1841                                         if (destvacpage->blkno > last_move_dest_block)
1842                                                 last_move_dest_block = destvacpage->blkno;
1843
1844                                         /*
1845                                          * Set new tuple's t_ctid pointing to itself for last
1846                                          * tuple in chain, and to next tuple in chain
1847                                          * otherwise.
1848                                          */
1849                                         if (!ItemPointerIsValid(&Ctid))
1850                                                 newtup.t_data->t_ctid = newtup.t_self;
1851                                         else
1852                                                 newtup.t_data->t_ctid = Ctid;
1853                                         Ctid = newtup.t_self;
1854
1855                                         num_moved++;
1856
1857                                         /*
1858                                          * Remember that we moved tuple from the current page
1859                                          * (corresponding index tuple will be cleaned).
1860                                          */
1861                                         if (Cbuf == buf)
1862                                                 vacpage->offsets[vacpage->offsets_free++] =
1863                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1864                                         else
1865                                                 keep_tuples++;
1866
1867                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1868                                         if (cur_buffer != Cbuf)
1869                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
1870
1871                                         /* Create index entries for the moved tuple */
1872                                         if (resultRelInfo->ri_NumIndices > 0)
1873                                         {
1874                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
1875                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
1876                                                                                           estate, true);
1877                                         }
1878
1879                                         WriteBuffer(cur_buffer);
1880                                         WriteBuffer(Cbuf);
1881                                 }
1882                                 cur_buffer = InvalidBuffer;
1883                                 pfree(vtmove);
1884                                 chain_tuple_moved = true;
1885                                 continue;
1886                         }
1887
1888                         /* try to find new page for this tuple */
1889                         if (cur_buffer == InvalidBuffer ||
1890                                 !enough_space(cur_page, tuple_len))
1891                         {
1892                                 if (cur_buffer != InvalidBuffer)
1893                                 {
1894                                         WriteBuffer(cur_buffer);
1895                                         cur_buffer = InvalidBuffer;
1896                                 }
1897                                 for (i = 0; i < num_fraged_pages; i++)
1898                                 {
1899                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
1900                                                 break;
1901                                 }
1902                                 if (i == num_fraged_pages)
1903                                         break;          /* can't move item anywhere */
1904                                 cur_item = i;
1905                                 cur_page = fraged_pages->pagedesc[cur_item];
1906                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
1907                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1908                                 ToPage = BufferGetPage(cur_buffer);
1909                                 /* if this page was not used before - clean it */
1910                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
1911                                         vacuum_page(onerel, cur_buffer, cur_page);
1912                         }
1913                         else
1914                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1915
1916                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1917
1918                         /* copy tuple */
1919                         heap_copytuple_with_tuple(&tuple, &newtup);
1920
1921                         /*
1922                          * register invalidation of source tuple in catcaches.
1923                          *
1924                          * (Note: we do not need to register the copied tuple,
1925                          * because we are not changing the tuple contents and
1926                          * so there cannot be any need to flush negative
1927                          * catcache entries.)
1928                          */
1929                         CacheInvalidateHeapTuple(onerel, &tuple);
1930
1931                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1932                         START_CRIT_SECTION();
1933
1934                         /*
1935                          * Mark new tuple as moved_in by vacuum and store vacuum XID
1936                          * in t_cmin !!!
1937                          */
1938                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1939                         newtup.t_data->t_infomask &=
1940                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1941                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1942
1943                         /* add tuple to the page */
1944                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1945                                                                  InvalidOffsetNumber, LP_USED);
1946                         if (newoff == InvalidOffsetNumber)
1947                         {
1948                                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
1949                                          (unsigned long) tuple_len,
1950                                          cur_page->blkno, (unsigned long) cur_page->free,
1951                                          cur_page->offsets_used, cur_page->offsets_free);
1952                         }
1953                         newitemid = PageGetItemId(ToPage, newoff);
1954                         pfree(newtup.t_data);
1955                         newtup.t_datamcxt = NULL;
1956                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1957                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
1958                         newtup.t_self = newtup.t_data->t_ctid;
1959
1960                         /*
1961                          * Mark old tuple as moved_off by vacuum and store vacuum XID
1962                          * in t_cmin !!!
1963                          */
1964                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1965                         tuple.t_data->t_infomask &=
1966                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1967                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1968
1969                         {
1970                                 XLogRecPtr      recptr =
1971                                 log_heap_move(onerel, buf, tuple.t_self,
1972                                                           cur_buffer, &newtup);
1973
1974                                 PageSetLSN(page, recptr);
1975                                 PageSetSUI(page, ThisStartUpID);
1976                                 PageSetLSN(ToPage, recptr);
1977                                 PageSetSUI(ToPage, ThisStartUpID);
1978                         }
1979                         END_CRIT_SECTION();
1980
1981                         cur_page->offsets_used++;
1982                         num_moved++;
1983                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1984                         if (cur_page->blkno > last_move_dest_block)
1985                                 last_move_dest_block = cur_page->blkno;
1986
1987                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1988
1989                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1990                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1991
1992                         /* insert index' tuples if needed */
1993                         if (resultRelInfo->ri_NumIndices > 0)
1994                         {
1995                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
1996                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
1997                         }
1998                 }                                               /* walk along page */
1999
2000                 if (offnum < maxoff && keep_tuples > 0)
2001                 {
2002                         OffsetNumber off;
2003
2004                         for (off = OffsetNumberNext(offnum);
2005                                  off <= maxoff;
2006                                  off = OffsetNumberNext(off))
2007                         {
2008                                 itemid = PageGetItemId(page, off);
2009                                 if (!ItemIdIsUsed(itemid))
2010                                         continue;
2011                                 tuple.t_datamcxt = NULL;
2012                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2013                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2014                                         continue;
2015                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
2016                                         elog(ERROR, "Invalid XID in t_cmin (4)");
2017                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2018                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
2019                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2020                                 {
2021                                         /* some chains was moved while */
2022                                         if (chain_tuple_moved)
2023                                         {                       /* cleaning this page */
2024                                                 Assert(vacpage->offsets_free > 0);
2025                                                 for (i = 0; i < vacpage->offsets_free; i++)
2026                                                 {
2027                                                         if (vacpage->offsets[i] == off)
2028                                                                 break;
2029                                                 }
2030                                                 if (i >= vacpage->offsets_free) /* not found */
2031                                                 {
2032                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2033                                                         Assert(keep_tuples > 0);
2034                                                         keep_tuples--;
2035                                                 }
2036                                         }
2037                                         else
2038                                         {
2039                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2040                                                 Assert(keep_tuples > 0);
2041                                                 keep_tuples--;
2042                                         }
2043                                 }
2044                         }
2045                 }
2046
2047                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2048                 {
2049                         if (chain_tuple_moved)          /* else - they are ordered */
2050                         {
2051                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2052                                           sizeof(OffsetNumber), vac_cmp_offno);
2053                         }
2054                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2055                         WriteBuffer(buf);
2056                 }
2057                 else if (dowrite)
2058                         WriteBuffer(buf);
2059                 else
2060                         ReleaseBuffer(buf);
2061
2062                 if (offnum <= maxoff)
2063                         break;                          /* some item(s) left */
2064
2065         }                                                       /* walk along relation */
2066
2067         blkno++;                                        /* new number of blocks */
2068
2069         if (cur_buffer != InvalidBuffer)
2070         {
2071                 Assert(num_moved > 0);
2072                 WriteBuffer(cur_buffer);
2073         }
2074
2075         if (num_moved > 0)
2076         {
2077                 /*
2078                  * We have to commit our tuple movings before we truncate the
2079                  * relation.  Ideally we should do Commit/StartTransactionCommand
2080                  * here, relying on the session-level table lock to protect our
2081                  * exclusive access to the relation.  However, that would require
2082                  * a lot of extra code to close and re-open the relation, indexes,
2083                  * etc.  For now, a quick hack: record status of current
2084                  * transaction as committed, and continue.
2085                  */
2086                 RecordTransactionCommit();
2087         }
2088
2089         /*
2090          * We are not going to move any more tuples across pages, but we still
2091          * need to apply vacuum_page to compact free space in the remaining
2092          * pages in vacuum_pages list.  Note that some of these pages may also
2093          * be in the fraged_pages list, and may have had tuples moved onto
2094          * them; if so, we already did vacuum_page and needn't do it again.
2095          */
2096         for (i = 0, curpage = vacuum_pages->pagedesc;
2097                  i < vacuumed_pages;
2098                  i++, curpage++)
2099         {
2100                 CHECK_FOR_INTERRUPTS();
2101                 Assert((*curpage)->blkno < blkno);
2102                 if ((*curpage)->offsets_used == 0)
2103                 {
2104                         /* this page was not used as a move target, so must clean it */
2105                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2106                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2107                         page = BufferGetPage(buf);
2108                         if (!PageIsEmpty(page))
2109                                 vacuum_page(onerel, buf, *curpage);
2110                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2111                         WriteBuffer(buf);
2112                 }
2113         }
2114
2115         /*
2116          * Now scan all the pages that we moved tuples onto and update tuple
2117          * status bits.  This is not really necessary, but will save time for
2118          * future transactions examining these tuples.
2119          *
2120          * XXX WARNING that this code fails to clear HEAP_MOVED_OFF tuples from
2121          * pages that were move source pages but not move dest pages.  One
2122          * also wonders whether it wouldn't be better to skip this step and
2123          * let the tuple status updates happen someplace that's not holding an
2124          * exclusive lock on the relation.
2125          */
2126         checked_moved = 0;
2127         for (i = 0, curpage = fraged_pages->pagedesc;
2128                  i < num_fraged_pages;
2129                  i++, curpage++)
2130         {
2131                 CHECK_FOR_INTERRUPTS();
2132                 Assert((*curpage)->blkno < blkno);
2133                 if ((*curpage)->blkno > last_move_dest_block)
2134                         break;                          /* no need to scan any further */
2135                 if ((*curpage)->offsets_used == 0)
2136                         continue;                       /* this page was never used as a move dest */
2137                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2138                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2139                 page = BufferGetPage(buf);
2140                 num_tuples = 0;
2141                 max_offset = PageGetMaxOffsetNumber(page);
2142                 for (newoff = FirstOffsetNumber;
2143                          newoff <= max_offset;
2144                          newoff = OffsetNumberNext(newoff))
2145                 {
2146                         itemid = PageGetItemId(page, newoff);
2147                         if (!ItemIdIsUsed(itemid))
2148                                 continue;
2149                         tuple.t_datamcxt = NULL;
2150                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2151                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2152                         {
2153                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
2154                                         elog(ERROR, "Invalid XID in t_cmin (2)");
2155                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2156                                 {
2157                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2158                                         num_tuples++;
2159                                 }
2160                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2161                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2162                                 else
2163                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2164                         }
2165                 }
2166                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2167                 WriteBuffer(buf);
2168                 Assert((*curpage)->offsets_used == num_tuples);
2169                 checked_moved += num_tuples;
2170         }
2171         Assert(num_moved == checked_moved);
2172
2173         elog(elevel, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
2174                  RelationGetRelationName(onerel),
2175                  nblocks, blkno, num_moved,
2176                  vac_show_rusage(&ru0));
2177
2178         /*
2179          * Reflect the motion of system tuples to catalog cache here.
2180          */
2181         CommandCounterIncrement();
2182
2183         if (Nvacpagelist.num_pages > 0)
2184         {
2185                 /* vacuum indexes again if needed */
2186                 if (Irel != (Relation *) NULL)
2187                 {
2188                         VacPage    *vpleft,
2189                                            *vpright,
2190                                                 vpsave;
2191
2192                         /* re-sort Nvacpagelist.pagedesc */
2193                         for (vpleft = Nvacpagelist.pagedesc,
2194                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2195                                  vpleft < vpright; vpleft++, vpright--)
2196                         {
2197                                 vpsave = *vpleft;
2198                                 *vpleft = *vpright;
2199                                 *vpright = vpsave;
2200                         }
2201                         Assert(keep_tuples >= 0);
2202                         for (i = 0; i < nindexes; i++)
2203                                 vacuum_index(&Nvacpagelist, Irel[i],
2204                                                          vacrelstats->rel_tuples, keep_tuples);
2205                 }
2206
2207                 /* clean moved tuples from last page in Nvacpagelist list */
2208                 if (vacpage->blkno == (blkno - 1) &&
2209                         vacpage->offsets_free > 0)
2210                 {
2211                         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2212                         OffsetNumber *unused = unbuf;
2213                         int                     uncnt;
2214
2215                         buf = ReadBuffer(onerel, vacpage->blkno);
2216                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2217                         page = BufferGetPage(buf);
2218                         num_tuples = 0;
2219                         maxoff = PageGetMaxOffsetNumber(page);
2220                         for (offnum = FirstOffsetNumber;
2221                                  offnum <= maxoff;
2222                                  offnum = OffsetNumberNext(offnum))
2223                         {
2224                                 itemid = PageGetItemId(page, offnum);
2225                                 if (!ItemIdIsUsed(itemid))
2226                                         continue;
2227                                 tuple.t_datamcxt = NULL;
2228                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2229
2230                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2231                                 {
2232                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
2233                                                 elog(ERROR, "Invalid XID in t_cmin (3)");
2234                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2235                                         {
2236                                                 itemid->lp_flags &= ~LP_USED;
2237                                                 num_tuples++;
2238                                         }
2239                                         else
2240                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2241                                 }
2242
2243                         }
2244                         Assert(vacpage->offsets_free == num_tuples);
2245                         START_CRIT_SECTION();
2246                         uncnt = PageRepairFragmentation(page, unused);
2247                         {
2248                                 XLogRecPtr      recptr;
2249
2250                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
2251                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2252                                 PageSetLSN(page, recptr);
2253                                 PageSetSUI(page, ThisStartUpID);
2254                         }
2255                         END_CRIT_SECTION();
2256                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2257                         WriteBuffer(buf);
2258                 }
2259
2260                 /* now - free new list of reaped pages */
2261                 curpage = Nvacpagelist.pagedesc;
2262                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2263                         pfree(*curpage);
2264                 pfree(Nvacpagelist.pagedesc);
2265         }
2266
2267         /*
2268          * Flush dirty pages out to disk.  We do this unconditionally, even if
2269          * we don't need to truncate, because we want to ensure that all
2270          * tuples have correct on-row commit status on disk (see bufmgr.c's
2271          * comments for FlushRelationBuffers()).
2272          */
2273         i = FlushRelationBuffers(onerel, blkno);
2274         if (i < 0)
2275                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2276                          i);
2277
2278         /* truncate relation, if needed */
2279         if (blkno < nblocks)
2280         {
2281                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2282                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2283                 onerel->rd_targblock = InvalidBlockNumber;
2284                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2285         }
2286
2287         /* clean up */
2288         pfree(vacpage);
2289         if (vacrelstats->vtlinks != NULL)
2290                 pfree(vacrelstats->vtlinks);
2291
2292         ExecDropTupleTable(tupleTable, true);
2293
2294         ExecCloseIndices(resultRelInfo);
2295 }
2296
2297 /*
2298  *      vacuum_heap() -- free dead tuples
2299  *
2300  *              This routine marks dead tuples as unused and truncates relation
2301  *              if there are "empty" end-blocks.
2302  */
2303 static void
2304 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2305 {
2306         Buffer          buf;
2307         VacPage    *vacpage;
2308         BlockNumber relblocks;
2309         int                     nblocks;
2310         int                     i;
2311
2312         nblocks = vacuum_pages->num_pages;
2313         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2314
2315         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2316         {
2317                 CHECK_FOR_INTERRUPTS();
2318                 if ((*vacpage)->offsets_free > 0)
2319                 {
2320                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2321                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2322                         vacuum_page(onerel, buf, *vacpage);
2323                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2324                         WriteBuffer(buf);
2325                 }
2326         }
2327
2328         /*
2329          * Flush dirty pages out to disk.  We do this unconditionally, even if
2330          * we don't need to truncate, because we want to ensure that all
2331          * tuples have correct on-row commit status on disk (see bufmgr.c's
2332          * comments for FlushRelationBuffers()).
2333          */
2334         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2335         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2336
2337         i = FlushRelationBuffers(onerel, relblocks);
2338         if (i < 0)
2339                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2340                          i);
2341
2342         /* truncate relation if there are some empty end-pages */
2343         if (vacuum_pages->empty_end_pages > 0)
2344         {
2345                 elog(elevel, "Rel %s: Pages: %u --> %u.",
2346                          RelationGetRelationName(onerel),
2347                          vacrelstats->rel_pages, relblocks);
2348                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2349                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2350                 onerel->rd_targblock = InvalidBlockNumber;
2351                 vacrelstats->rel_pages = relblocks;             /* set new number of
2352                                                                                                  * blocks */
2353         }
2354 }
2355
2356 /*
2357  *      vacuum_page() -- free dead tuples on a page
2358  *                                       and repair its fragmentation.
2359  */
2360 static void
2361 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2362 {
2363         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2364         OffsetNumber *unused = unbuf;
2365         int                     uncnt;
2366         Page            page = BufferGetPage(buffer);
2367         ItemId          itemid;
2368         int                     i;
2369
2370         /* There shouldn't be any tuples moved onto the page yet! */
2371         Assert(vacpage->offsets_used == 0);
2372
2373         START_CRIT_SECTION();
2374         for (i = 0; i < vacpage->offsets_free; i++)
2375         {
2376                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2377                 itemid->lp_flags &= ~LP_USED;
2378         }
2379         uncnt = PageRepairFragmentation(page, unused);
2380         {
2381                 XLogRecPtr      recptr;
2382
2383                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2384                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2385                 PageSetLSN(page, recptr);
2386                 PageSetSUI(page, ThisStartUpID);
2387         }
2388         END_CRIT_SECTION();
2389 }
2390
2391 /*
2392  *      scan_index() -- scan one index relation to update statistic.
2393  *
2394  * We use this when we have no deletions to do.
2395  */
2396 static void
2397 scan_index(Relation indrel, double num_tuples)
2398 {
2399         IndexBulkDeleteResult *stats;
2400         VacRUsage       ru0;
2401
2402         vac_init_rusage(&ru0);
2403
2404         /*
2405          * Even though we're not planning to delete anything, use the
2406          * ambulkdelete call, so that the scan happens within the index AM for
2407          * more speed.
2408          */
2409         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2410
2411         if (!stats)
2412                 return;
2413
2414         /* now update statistics in pg_class */
2415         vac_update_relstats(RelationGetRelid(indrel),
2416                                                 stats->num_pages, stats->num_index_tuples,
2417                                                 false);
2418
2419         elog(elevel, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
2420                  RelationGetRelationName(indrel),
2421                  stats->num_pages, stats->num_index_tuples,
2422                  vac_show_rusage(&ru0));
2423
2424         /*
2425          * Check for tuple count mismatch.      If the index is partial, then it's
2426          * OK for it to have fewer tuples than the heap; else we got trouble.
2427          */
2428         if (stats->num_index_tuples != num_tuples)
2429         {
2430                 if (stats->num_index_tuples > num_tuples ||
2431                         !vac_is_partial_index(indrel))
2432                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2433 \n\tRecreate the index.",
2434                                  RelationGetRelationName(indrel),
2435                                  stats->num_index_tuples, num_tuples);
2436         }
2437
2438         pfree(stats);
2439 }
2440
2441 /*
2442  *      vacuum_index() -- vacuum one index relation.
2443  *
2444  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2445  *              It's locked. Indrel is an index relation on the vacuumed heap.
2446  *
2447  *              We don't bother to set locks on the index relation here, since
2448  *              the parent table is exclusive-locked already.
2449  *
2450  *              Finally, we arrange to update the index relation's statistics in
2451  *              pg_class.
2452  */
2453 static void
2454 vacuum_index(VacPageList vacpagelist, Relation indrel,
2455                          double num_tuples, int keep_tuples)
2456 {
2457         IndexBulkDeleteResult *stats;
2458         VacRUsage       ru0;
2459
2460         vac_init_rusage(&ru0);
2461
2462         /* Do bulk deletion */
2463         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2464
2465         if (!stats)
2466                 return;
2467
2468         /* now update statistics in pg_class */
2469         vac_update_relstats(RelationGetRelid(indrel),
2470                                                 stats->num_pages, stats->num_index_tuples,
2471                                                 false);
2472
2473         elog(elevel, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
2474                  RelationGetRelationName(indrel), stats->num_pages,
2475                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2476                  vac_show_rusage(&ru0));
2477
2478         /*
2479          * Check for tuple count mismatch.      If the index is partial, then it's
2480          * OK for it to have fewer tuples than the heap; else we got trouble.
2481          */
2482         if (stats->num_index_tuples != num_tuples + keep_tuples)
2483         {
2484                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2485                         !vac_is_partial_index(indrel))
2486                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2487 \n\tRecreate the index.",
2488                                  RelationGetRelationName(indrel),
2489                                  stats->num_index_tuples, num_tuples);
2490         }
2491
2492         pfree(stats);
2493 }
2494
2495 /*
2496  *      tid_reaped() -- is a particular tid reaped?
2497  *
2498  *              This has the right signature to be an IndexBulkDeleteCallback.
2499  *
2500  *              vacpagelist->VacPage_array is sorted in right order.
2501  */
2502 static bool
2503 tid_reaped(ItemPointer itemptr, void *state)
2504 {
2505         VacPageList vacpagelist = (VacPageList) state;
2506         OffsetNumber ioffno;
2507         OffsetNumber *voff;
2508         VacPage         vp,
2509                            *vpp;
2510         VacPageData vacpage;
2511
2512         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2513         ioffno = ItemPointerGetOffsetNumber(itemptr);
2514
2515         vp = &vacpage;
2516         vpp = (VacPage *) vac_bsearch((void *) &vp,
2517                                                                   (void *) (vacpagelist->pagedesc),
2518                                                                   vacpagelist->num_pages,
2519                                                                   sizeof(VacPage),
2520                                                                   vac_cmp_blk);
2521
2522         if (vpp == NULL)
2523                 return false;
2524
2525         /* ok - we are on a partially or fully reaped page */
2526         vp = *vpp;
2527
2528         if (vp->offsets_free == 0)
2529         {
2530                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2531                 return true;
2532         }
2533
2534         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2535                                                                                 (void *) (vp->offsets),
2536                                                                                 vp->offsets_free,
2537                                                                                 sizeof(OffsetNumber),
2538                                                                                 vac_cmp_offno);
2539
2540         if (voff == NULL)
2541                 return false;
2542
2543         /* tid is reaped */
2544         return true;
2545 }
2546
2547 /*
2548  * Dummy version for scan_index.
2549  */
2550 static bool
2551 dummy_tid_reaped(ItemPointer itemptr, void *state)
2552 {
2553         return false;
2554 }
2555
2556 /*
2557  * Update the shared Free Space Map with the info we now have about
2558  * free space in the relation, discarding any old info the map may have.
2559  */
2560 static void
2561 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2562                            BlockNumber rel_pages)
2563 {
2564         int                     nPages = fraged_pages->num_pages;
2565         int                     i;
2566         BlockNumber *pages;
2567         Size       *spaceAvail;
2568
2569         /* +1 to avoid palloc(0) */
2570         pages = (BlockNumber *) palloc((nPages + 1) * sizeof(BlockNumber));
2571         spaceAvail = (Size *) palloc((nPages + 1) * sizeof(Size));
2572
2573         for (i = 0; i < nPages; i++)
2574         {
2575                 pages[i] = fraged_pages->pagedesc[i]->blkno;
2576                 spaceAvail[i] = fraged_pages->pagedesc[i]->free;
2577
2578                 /*
2579                  * fraged_pages may contain entries for pages that we later
2580                  * decided to truncate from the relation; don't enter them into
2581                  * the map!
2582                  */
2583                 if (pages[i] >= rel_pages)
2584                 {
2585                         nPages = i;
2586                         break;
2587                 }
2588         }
2589
2590         MultiRecordFreeSpace(&onerel->rd_node,
2591                                                  0, MaxBlockNumber,
2592                                                  nPages, pages, spaceAvail);
2593         pfree(pages);
2594         pfree(spaceAvail);
2595 }
2596
2597 /* Copy a VacPage structure */
2598 static VacPage
2599 copy_vac_page(VacPage vacpage)
2600 {
2601         VacPage         newvacpage;
2602
2603         /* allocate a VacPageData entry */
2604         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2605                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2606
2607         /* fill it in */
2608         if (vacpage->offsets_free > 0)
2609                 memcpy(newvacpage->offsets, vacpage->offsets,
2610                            vacpage->offsets_free * sizeof(OffsetNumber));
2611         newvacpage->blkno = vacpage->blkno;
2612         newvacpage->free = vacpage->free;
2613         newvacpage->offsets_used = vacpage->offsets_used;
2614         newvacpage->offsets_free = vacpage->offsets_free;
2615
2616         return newvacpage;
2617 }
2618
2619 /*
2620  * Add a VacPage pointer to a VacPageList.
2621  *
2622  *              As a side effect of the way that scan_heap works,
2623  *              higher pages come after lower pages in the array
2624  *              (and highest tid on a page is last).
2625  */
2626 static void
2627 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2628 {
2629 #define PG_NPAGEDESC 1024
2630
2631         /* allocate a VacPage entry if needed */
2632         if (vacpagelist->num_pages == 0)
2633         {
2634                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2635                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2636         }
2637         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2638         {
2639                 vacpagelist->num_allocated_pages *= 2;
2640                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2641         }
2642         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2643         (vacpagelist->num_pages)++;
2644 }
2645
2646 /*
2647  * vac_bsearch: just like standard C library routine bsearch(),
2648  * except that we first test to see whether the target key is outside
2649  * the range of the table entries.      This case is handled relatively slowly
2650  * by the normal binary search algorithm (ie, no faster than any other key)
2651  * but it occurs often enough in VACUUM to be worth optimizing.
2652  */
2653 static void *
2654 vac_bsearch(const void *key, const void *base,
2655                         size_t nelem, size_t size,
2656                         int (*compar) (const void *, const void *))
2657 {
2658         int                     res;
2659         const void *last;
2660
2661         if (nelem == 0)
2662                 return NULL;
2663         res = compar(key, base);
2664         if (res < 0)
2665                 return NULL;
2666         if (res == 0)
2667                 return (void *) base;
2668         if (nelem > 1)
2669         {
2670                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2671                 res = compar(key, last);
2672                 if (res > 0)
2673                         return NULL;
2674                 if (res == 0)
2675                         return (void *) last;
2676         }
2677         if (nelem <= 2)
2678                 return NULL;                    /* already checked 'em all */
2679         return bsearch(key, base, nelem, size, compar);
2680 }
2681
2682 /*
2683  * Comparator routines for use with qsort() and bsearch().
2684  */
2685 static int
2686 vac_cmp_blk(const void *left, const void *right)
2687 {
2688         BlockNumber lblk,
2689                                 rblk;
2690
2691         lblk = (*((VacPage *) left))->blkno;
2692         rblk = (*((VacPage *) right))->blkno;
2693
2694         if (lblk < rblk)
2695                 return -1;
2696         if (lblk == rblk)
2697                 return 0;
2698         return 1;
2699 }
2700
2701 static int
2702 vac_cmp_offno(const void *left, const void *right)
2703 {
2704         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2705                 return -1;
2706         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2707                 return 0;
2708         return 1;
2709 }
2710
2711 static int
2712 vac_cmp_vtlinks(const void *left, const void *right)
2713 {
2714         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2715                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2716                 return -1;
2717         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2718                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2719                 return 1;
2720         /* bi_hi-es are equal */
2721         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2722                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2723                 return -1;
2724         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2725                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2726                 return 1;
2727         /* bi_lo-es are equal */
2728         if (((VTupleLink) left)->new_tid.ip_posid <
2729                 ((VTupleLink) right)->new_tid.ip_posid)
2730                 return -1;
2731         if (((VTupleLink) left)->new_tid.ip_posid >
2732                 ((VTupleLink) right)->new_tid.ip_posid)
2733                 return 1;
2734         return 0;
2735 }
2736
2737
2738 void
2739 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2740 {
2741         List       *indexoidlist,
2742                            *indexoidscan;
2743         int                     i;
2744
2745         indexoidlist = RelationGetIndexList(relation);
2746
2747         *nindexes = length(indexoidlist);
2748
2749         if (*nindexes > 0)
2750                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2751         else
2752                 *Irel = NULL;
2753
2754         i = 0;
2755         foreach(indexoidscan, indexoidlist)
2756         {
2757                 Oid                     indexoid = lfirsti(indexoidscan);
2758
2759                 (*Irel)[i] = index_open(indexoid);
2760                 i++;
2761         }
2762
2763         freeList(indexoidlist);
2764 }
2765
2766
2767 void
2768 vac_close_indexes(int nindexes, Relation *Irel)
2769 {
2770         if (Irel == (Relation *) NULL)
2771                 return;
2772
2773         while (nindexes--)
2774                 index_close(Irel[nindexes]);
2775         pfree(Irel);
2776 }
2777
2778
2779 /*
2780  * Is an index partial (ie, could it contain fewer tuples than the heap?)
2781  */
2782 bool
2783 vac_is_partial_index(Relation indrel)
2784 {
2785         /*
2786          * If the index's AM doesn't support nulls, it's partial for our
2787          * purposes
2788          */
2789         if (!indrel->rd_am->amindexnulls)
2790                 return true;
2791
2792         /* Otherwise, look to see if there's a partial-index predicate */
2793         return (VARSIZE(&indrel->rd_index->indpred) > VARHDRSZ);
2794 }
2795
2796
2797 static bool
2798 enough_space(VacPage vacpage, Size len)
2799 {
2800         len = MAXALIGN(len);
2801
2802         if (len > vacpage->free)
2803                 return false;
2804
2805         /* if there are free itemid(s) and len <= free_space... */
2806         if (vacpage->offsets_used < vacpage->offsets_free)
2807                 return true;
2808
2809         /* noff_used >= noff_free and so we'll have to allocate new itemid */
2810         if (len + sizeof(ItemIdData) <= vacpage->free)
2811                 return true;
2812
2813         return false;
2814 }
2815
2816
2817 /*
2818  * Initialize usage snapshot.
2819  */
2820 void
2821 vac_init_rusage(VacRUsage *ru0)
2822 {
2823         struct timezone tz;
2824
2825         getrusage(RUSAGE_SELF, &ru0->ru);
2826         gettimeofday(&ru0->tv, &tz);
2827 }
2828
2829 /*
2830  * Compute elapsed time since ru0 usage snapshot, and format into
2831  * a displayable string.  Result is in a static string, which is
2832  * tacky, but no one ever claimed that the Postgres backend is
2833  * threadable...
2834  */
2835 const char *
2836 vac_show_rusage(VacRUsage *ru0)
2837 {
2838         static char result[100];
2839         VacRUsage       ru1;
2840
2841         vac_init_rusage(&ru1);
2842
2843         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
2844         {
2845                 ru1.tv.tv_sec--;
2846                 ru1.tv.tv_usec += 1000000;
2847         }
2848         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
2849         {
2850                 ru1.ru.ru_stime.tv_sec--;
2851                 ru1.ru.ru_stime.tv_usec += 1000000;
2852         }
2853         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
2854         {
2855                 ru1.ru.ru_utime.tv_sec--;
2856                 ru1.ru.ru_utime.tv_usec += 1000000;
2857         }
2858
2859         snprintf(result, sizeof(result),
2860                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
2861                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
2862           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
2863                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
2864           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
2865                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
2866                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
2867
2868         return result;
2869 }