]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Reimplement temp tables using schemas. The temp table map is history;
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.220 2002/03/31 06:26:30 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/pg_database.h"
31 #include "catalog/pg_index.h"
32 #include "commands/vacuum.h"
33 #include "executor/executor.h"
34 #include "miscadmin.h"
35 #include "storage/freespace.h"
36 #include "storage/sinval.h"
37 #include "storage/smgr.h"
38 #include "tcop/pquery.h"
39 #include "utils/acl.h"
40 #include "utils/builtins.h"
41 #include "utils/fmgroids.h"
42 #include "utils/inval.h"
43 #include "utils/relcache.h"
44 #include "utils/syscache.h"
45 #include "pgstat.h"
46
47
48 typedef struct VRelListData
49 {
50         Oid                     vrl_relid;
51         struct VRelListData *vrl_next;
52 } VRelListData;
53
54 typedef VRelListData *VRelList;
55
56 typedef struct VacPageData
57 {
58         BlockNumber blkno;                      /* BlockNumber of this Page */
59         Size            free;                   /* FreeSpace on this Page */
60         uint16          offsets_used;   /* Number of OffNums used by vacuum */
61         uint16          offsets_free;   /* Number of OffNums free or to be free */
62         OffsetNumber offsets[1];        /* Array of free OffNums */
63 } VacPageData;
64
65 typedef VacPageData *VacPage;
66
67 typedef struct VacPageListData
68 {
69         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
70         int                     num_pages;              /* Number of pages in pagedesc */
71         int                     num_allocated_pages;    /* Number of allocated pages in
72                                                                                  * pagedesc */
73         VacPage    *pagedesc;           /* Descriptions of pages */
74 } VacPageListData;
75
76 typedef VacPageListData *VacPageList;
77
78 typedef struct VTupleLinkData
79 {
80         ItemPointerData new_tid;
81         ItemPointerData this_tid;
82 } VTupleLinkData;
83
84 typedef VTupleLinkData *VTupleLink;
85
86 typedef struct VTupleMoveData
87 {
88         ItemPointerData tid;            /* tuple ID */
89         VacPage         vacpage;                /* where to move */
90         bool            cleanVpd;               /* clean vacpage before using */
91 } VTupleMoveData;
92
93 typedef VTupleMoveData *VTupleMove;
94
95 typedef struct VRelStats
96 {
97         BlockNumber rel_pages;
98         double          rel_tuples;
99         Size            min_tlen;
100         Size            max_tlen;
101         bool            hasindex;
102         int                     num_vtlinks;
103         VTupleLink      vtlinks;
104 } VRelStats;
105
106
107 static MemoryContext vac_context = NULL;
108
109 static int elevel = -1;
110
111 static TransactionId OldestXmin;
112 static TransactionId FreezeLimit;
113
114 static TransactionId initialOldestXmin;
115 static TransactionId initialFreezeLimit;
116
117
118 /* non-export function prototypes */
119 static void vacuum_init(VacuumStmt *vacstmt);
120 static void vacuum_shutdown(VacuumStmt *vacstmt);
121 static VRelList getrels(Name VacRelP, const char *stmttype);
122 static void vac_update_dbstats(Oid dbid,
123                                    TransactionId vacuumXID,
124                                    TransactionId frozenXID);
125 static void vac_truncate_clog(TransactionId vacuumXID,
126                                   TransactionId frozenXID);
127 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt);
128 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
129 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
130                   VacPageList vacuum_pages, VacPageList fraged_pages);
131 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
132                         VacPageList vacuum_pages, VacPageList fraged_pages,
133                         int nindexes, Relation *Irel);
134 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
135                         VacPageList vacpagelist);
136 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
137 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
138                          double num_tuples, int keep_tuples);
139 static void scan_index(Relation indrel, double num_tuples);
140 static bool tid_reaped(ItemPointer itemptr, void *state);
141 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
142 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
143                            BlockNumber rel_pages);
144 static VacPage copy_vac_page(VacPage vacpage);
145 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
146 static void *vac_bsearch(const void *key, const void *base,
147                         size_t nelem, size_t size,
148                         int (*compar) (const void *, const void *));
149 static int      vac_cmp_blk(const void *left, const void *right);
150 static int      vac_cmp_offno(const void *left, const void *right);
151 static int      vac_cmp_vtlinks(const void *left, const void *right);
152 static bool enough_space(VacPage vacpage, Size len);
153
154
155 /****************************************************************************
156  *                                                                                                                                                      *
157  *                      Code common to all flavors of VACUUM and ANALYZE                                *
158  *                                                                                                                                                      *
159  ****************************************************************************
160  */
161
162
163 /*
164  * Primary entry point for VACUUM and ANALYZE commands.
165  */
166 void
167 vacuum(VacuumStmt *vacstmt)
168 {
169         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
170         NameData        VacRel;
171         Name            VacRelName;
172         VRelList        vrl,
173                                 cur;
174
175         /*
176          * We cannot run VACUUM inside a user transaction block; if we were
177          * inside a transaction, then our commit- and
178          * start-transaction-command calls would not have the intended effect!
179          * Furthermore, the forced commit that occurs before truncating the
180          * relation's file would have the effect of committing the rest of the
181          * user's transaction too, which would certainly not be the desired
182          * behavior.
183          */
184         if (IsTransactionBlock())
185                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
186
187         /*
188          * Send info about dead objects to the statistics collector
189          */
190         pgstat_vacuum_tabstat();
191
192         if (vacstmt->verbose)
193                 elevel = INFO;
194         else
195                 elevel = DEBUG1;
196
197         /*
198          * Create special memory context for cross-transaction storage.
199          *
200          * Since it is a child of QueryContext, it will go away eventually even
201          * if we suffer an error; there's no need for special abort cleanup
202          * logic.
203          */
204         vac_context = AllocSetContextCreate(QueryContext,
205                                                                                 "Vacuum",
206                                                                                 ALLOCSET_DEFAULT_MINSIZE,
207                                                                                 ALLOCSET_DEFAULT_INITSIZE,
208                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
209
210         /* Convert relname, which is just a string, to a Name */
211         if (vacstmt->relation)
212         {
213                 namestrcpy(&VacRel, vacstmt->relation->relname);
214                 VacRelName = &VacRel;
215         }
216         else
217                 VacRelName = NULL;
218
219         /* Build list of relations to process (note this lives in vac_context) */
220         vrl = getrels(VacRelName, stmttype);
221
222         /*
223          * Start up the vacuum cleaner.
224          */
225         vacuum_init(vacstmt);
226
227         /*
228          * Process each selected relation.      We are careful to process each
229          * relation in a separate transaction in order to avoid holding too
230          * many locks at one time.      Also, if we are doing VACUUM ANALYZE, the
231          * ANALYZE part runs as a separate transaction from the VACUUM to
232          * further reduce locking.
233          */
234         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
235         {
236                 if (vacstmt->vacuum)
237                         vacuum_rel(cur->vrl_relid, vacstmt);
238                 if (vacstmt->analyze)
239                         analyze_rel(cur->vrl_relid, vacstmt);
240         }
241
242         /* clean up */
243         vacuum_shutdown(vacstmt);
244 }
245
246 /*
247  *      vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
248  *
249  *              Formerly, there was code here to prevent more than one VACUUM from
250  *              executing concurrently in the same database.  However, there's no
251  *              good reason to prevent that, and manually removing lockfiles after
252  *              a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
253  *              and just rely on the locks we grab on each target table
254  *              to ensure that there aren't two VACUUMs running on the same table
255  *              at the same time.
256  *
257  *              The strangeness with committing and starting transactions in the
258  *              init and shutdown routines is due to the fact that the vacuum cleaner
259  *              is invoked via an SQL command, and so is already executing inside
260  *              a transaction.  We need to leave ourselves in a predictable state
261  *              on entry and exit to the vacuum cleaner.  We commit the transaction
262  *              started in PostgresMain() inside vacuum_init(), and start one in
263  *              vacuum_shutdown() to match the commit waiting for us back in
264  *              PostgresMain().
265  */
266 static void
267 vacuum_init(VacuumStmt *vacstmt)
268 {
269         if (vacstmt->vacuum && vacstmt->relation == NULL)
270         {
271                 /*
272                  * Compute the initially applicable OldestXmin and FreezeLimit
273                  * XIDs, so that we can record these values at the end of the
274                  * VACUUM. Note that individual tables may well be processed with
275                  * newer values, but we can guarantee that no (non-shared)
276                  * relations are processed with older ones.
277                  *
278                  * It is okay to record non-shared values in pg_database, even though
279                  * we may vacuum shared relations with older cutoffs, because only
280                  * the minimum of the values present in pg_database matters.  We
281                  * can be sure that shared relations have at some time been
282                  * vacuumed with cutoffs no worse than the global minimum; for, if
283                  * there is a backend in some other DB with xmin = OLDXMIN that's
284                  * determining the cutoff with which we vacuum shared relations,
285                  * it is not possible for that database to have a cutoff newer
286                  * than OLDXMIN recorded in pg_database.
287                  */
288                 vacuum_set_xid_limits(vacstmt, false,
289                                                           &initialOldestXmin, &initialFreezeLimit);
290         }
291
292         /* matches the StartTransaction in PostgresMain() */
293         CommitTransactionCommand();
294 }
295
296 static void
297 vacuum_shutdown(VacuumStmt *vacstmt)
298 {
299         /* on entry, we are not in a transaction */
300
301         /* matches the CommitTransaction in PostgresMain() */
302         StartTransactionCommand();
303
304         /*
305          * If we did a database-wide VACUUM, update the database's pg_database
306          * row with info about the transaction IDs used, and try to truncate
307          * pg_clog.
308          */
309         if (vacstmt->vacuum && vacstmt->relation == NULL)
310         {
311                 vac_update_dbstats(MyDatabaseId,
312                                                    initialOldestXmin, initialFreezeLimit);
313                 vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
314         }
315
316         /*
317          * Clean up working storage --- note we must do this after
318          * StartTransactionCommand, else we might be trying to delete the
319          * active context!
320          */
321         MemoryContextDelete(vac_context);
322         vac_context = NULL;
323 }
324
325 /*
326  * Build a list of VRelListData nodes for each relation to be processed
327  *
328  * The list is built in vac_context so that it will survive across our
329  * per-relation transactions.
330  */
331 static VRelList
332 getrels(Name VacRelP, const char *stmttype)
333 {
334         Relation        rel;
335         TupleDesc       tupdesc;
336         HeapScanDesc scan;
337         HeapTuple       tuple;
338         VRelList        vrl,
339                                 cur;
340         Datum           d;
341         char       *rname;
342         char            rkind;
343         bool            n;
344         ScanKeyData key;
345
346         if (VacRelP)
347         {
348                 /*
349                  * we could use the cache here, but it is clearer to use scankeys
350                  * for both vacuum cases, bjm 2000/01/19
351                  */
352                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
353                                                            F_NAMEEQ,
354                                                            PointerGetDatum(NameStr(*VacRelP)));
355         }
356         else
357         {
358                 /* find all plain relations listed in pg_class */
359                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
360                                                            F_CHAREQ, CharGetDatum(RELKIND_RELATION));
361         }
362
363         vrl = cur = (VRelList) NULL;
364
365         rel = heap_openr(RelationRelationName, AccessShareLock);
366         tupdesc = RelationGetDescr(rel);
367
368         scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
369
370         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
371         {
372                 d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
373                 rname = (char *) DatumGetName(d);
374
375                 d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
376                 rkind = DatumGetChar(d);
377
378                 if (rkind != RELKIND_RELATION)
379                 {
380                         elog(WARNING, "%s: can not process indexes, views or special system tables",
381                                  stmttype);
382                         continue;
383                 }
384
385                 /* Make a relation list entry for this guy */
386                 if (vrl == (VRelList) NULL)
387                         vrl = cur = (VRelList)
388                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
389                 else
390                 {
391                         cur->vrl_next = (VRelList)
392                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
393                         cur = cur->vrl_next;
394                 }
395
396                 cur->vrl_relid = tuple->t_data->t_oid;
397                 cur->vrl_next = (VRelList) NULL;
398         }
399
400         heap_endscan(scan);
401         heap_close(rel, AccessShareLock);
402
403         if (vrl == NULL)
404                 elog(WARNING, "%s: table not found", stmttype);
405
406         return vrl;
407 }
408
409 /*
410  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
411  */
412 void
413 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
414                                           TransactionId *oldestXmin,
415                                           TransactionId *freezeLimit)
416 {
417         TransactionId limit;
418
419         *oldestXmin = GetOldestXmin(sharedRel);
420
421         Assert(TransactionIdIsNormal(*oldestXmin));
422
423         if (vacstmt->freeze)
424         {
425                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
426                 limit = *oldestXmin;
427         }
428         else
429         {
430                 /*
431                  * Normal case: freeze cutoff is well in the past, to wit, about
432                  * halfway to the wrap horizon
433                  */
434                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
435         }
436
437         /*
438          * Be careful not to generate a "permanent" XID
439          */
440         if (!TransactionIdIsNormal(limit))
441                 limit = FirstNormalTransactionId;
442
443         /*
444          * Ensure sane relationship of limits
445          */
446         if (TransactionIdFollows(limit, *oldestXmin))
447         {
448                 elog(WARNING, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
449                 limit = *oldestXmin;
450         }
451
452         *freezeLimit = limit;
453 }
454
455
456 /*
457  *      vac_update_relstats() -- update statistics for one relation
458  *
459  *              Update the whole-relation statistics that are kept in its pg_class
460  *              row.  There are additional stats that will be updated if we are
461  *              doing ANALYZE, but we always update these stats.  This routine works
462  *              for both index and heap relation entries in pg_class.
463  *
464  *              We violate no-overwrite semantics here by storing new values for the
465  *              statistics columns directly into the pg_class tuple that's already on
466  *              the page.  The reason for this is that if we updated these tuples in
467  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
468  *              by the time we got done with a vacuum cycle, most of the tuples in
469  *              pg_class would've been obsoleted.  Of course, this only works for
470  *              fixed-size never-null columns, but these are.
471  *
472  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
473  *              ANALYZE.
474  */
475 void
476 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
477                                         bool hasindex)
478 {
479         Relation        rd;
480         HeapTupleData rtup;
481         HeapTuple       ctup;
482         Form_pg_class pgcform;
483         Buffer          buffer;
484
485         /*
486          * update number of tuples and number of pages in pg_class
487          */
488         rd = heap_openr(RelationRelationName, RowExclusiveLock);
489
490         ctup = SearchSysCache(RELOID,
491                                                   ObjectIdGetDatum(relid),
492                                                   0, 0, 0);
493         if (!HeapTupleIsValid(ctup))
494                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
495                          relid);
496
497         /* get the buffer cache tuple */
498         rtup.t_self = ctup->t_self;
499         ReleaseSysCache(ctup);
500         heap_fetch(rd, SnapshotNow, &rtup, &buffer, NULL);
501
502         /* overwrite the existing statistics in the tuple */
503         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
504         pgcform->relpages = (int32) num_pages;
505         pgcform->reltuples = num_tuples;
506         pgcform->relhasindex = hasindex;
507
508         /*
509          * If we have discovered that there are no indexes, then there's no
510          * primary key either.  This could be done more thoroughly...
511          */
512         if (!hasindex)
513                 pgcform->relhaspkey = false;
514
515         /*
516          * Invalidate the tuple in the catcaches; this also arranges to flush
517          * the relation's relcache entry.  (If we fail to commit for some reason,
518          * no flush will occur, but no great harm is done since there are no
519          * noncritical state updates here.)
520          */
521         CacheInvalidateHeapTuple(rd, &rtup);
522
523         /* Write the buffer */
524         WriteBuffer(buffer);
525
526         heap_close(rd, RowExclusiveLock);
527 }
528
529
530 /*
531  *      vac_update_dbstats() -- update statistics for one database
532  *
533  *              Update the whole-database statistics that are kept in its pg_database
534  *              row.
535  *
536  *              We violate no-overwrite semantics here by storing new values for the
537  *              statistics columns directly into the tuple that's already on the page.
538  *              As with vac_update_relstats, this avoids leaving dead tuples behind
539  *              after a VACUUM; which is good since GetRawDatabaseInfo
540  *              can get confused by finding dead tuples in pg_database.
541  *
542  *              This routine is shared by full and lazy VACUUM.  Note that it is only
543  *              applied after a database-wide VACUUM operation.
544  */
545 static void
546 vac_update_dbstats(Oid dbid,
547                                    TransactionId vacuumXID,
548                                    TransactionId frozenXID)
549 {
550         Relation        relation;
551         ScanKeyData entry[1];
552         HeapScanDesc scan;
553         HeapTuple       tuple;
554         Form_pg_database dbform;
555
556         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
557
558         /* Must use a heap scan, since there's no syscache for pg_database */
559         ScanKeyEntryInitialize(&entry[0], 0x0,
560                                                    ObjectIdAttributeNumber, F_OIDEQ,
561                                                    ObjectIdGetDatum(dbid));
562
563         scan = heap_beginscan(relation, 0, SnapshotNow, 1, entry);
564
565         tuple = heap_getnext(scan, 0);
566
567         if (!HeapTupleIsValid(tuple))
568                 elog(ERROR, "database %u does not exist", dbid);
569
570         dbform = (Form_pg_database) GETSTRUCT(tuple);
571
572         /* overwrite the existing statistics in the tuple */
573         dbform->datvacuumxid = vacuumXID;
574         dbform->datfrozenxid = frozenXID;
575
576         /* invalidate the tuple in the cache and write the buffer */
577         CacheInvalidateHeapTuple(relation, tuple);
578         WriteNoReleaseBuffer(scan->rs_cbuf);
579
580         heap_endscan(scan);
581
582         heap_close(relation, RowExclusiveLock);
583 }
584
585
586 /*
587  *      vac_truncate_clog() -- attempt to truncate the commit log
588  *
589  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
590  *              and use it to truncate the transaction commit log (pg_clog).
591  *              Also generate a warning if the system-wide oldest datfrozenxid
592  *              seems to be in danger of wrapping around.
593  *
594  *              The passed XIDs are simply the ones I just wrote into my pg_database
595  *              entry.  They're used to initialize the "min" calculations.
596  *
597  *              This routine is shared by full and lazy VACUUM.  Note that it is only
598  *              applied after a database-wide VACUUM operation.
599  */
600 static void
601 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
602 {
603         Relation        relation;
604         HeapScanDesc scan;
605         HeapTuple       tuple;
606         int32           age;
607
608         relation = heap_openr(DatabaseRelationName, AccessShareLock);
609
610         scan = heap_beginscan(relation, 0, SnapshotNow, 0, NULL);
611
612         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
613         {
614                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
615
616                 /* Ignore non-connectable databases (eg, template0) */
617                 /* It's assumed that these have been frozen correctly */
618                 if (!dbform->datallowconn)
619                         continue;
620
621                 if (TransactionIdIsNormal(dbform->datvacuumxid) &&
622                         TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
623                         vacuumXID = dbform->datvacuumxid;
624                 if (TransactionIdIsNormal(dbform->datfrozenxid) &&
625                         TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
626                         frozenXID = dbform->datfrozenxid;
627         }
628
629         heap_endscan(scan);
630
631         heap_close(relation, AccessShareLock);
632
633         /* Truncate CLOG to the oldest vacuumxid */
634         TruncateCLOG(vacuumXID);
635
636         /* Give warning about impending wraparound problems */
637         age = (int32) (GetCurrentTransactionId() - frozenXID);
638         if (age > (int32) ((MaxTransactionId >> 3) * 3))
639                 elog(WARNING, "Some databases have not been vacuumed in %d transactions."
640                          "\n\tBetter vacuum them within %d transactions,"
641                          "\n\tor you may have a wraparound failure.",
642                          age, (int32) (MaxTransactionId >> 1) - age);
643 }
644
645
646 /****************************************************************************
647  *                                                                                                                                                      *
648  *                      Code common to both flavors of VACUUM                                                   *
649  *                                                                                                                                                      *
650  ****************************************************************************
651  */
652
653
654 /*
655  *      vacuum_rel() -- vacuum one heap relation
656  *
657  *              Doing one heap at a time incurs extra overhead, since we need to
658  *              check that the heap exists again just before we vacuum it.      The
659  *              reason that we do this is so that vacuuming can be spread across
660  *              many small transactions.  Otherwise, two-phase locking would require
661  *              us to lock the entire database during one pass of the vacuum cleaner.
662  *
663  *              At entry and exit, we are not inside a transaction.
664  */
665 static void
666 vacuum_rel(Oid relid, VacuumStmt *vacstmt)
667 {
668         LOCKMODE        lmode;
669         Relation        onerel;
670         LockRelId       onerelid;
671         Oid                     toast_relid;
672
673         /* Begin a transaction for vacuuming this relation */
674         StartTransactionCommand();
675
676         /*
677          * Check for user-requested abort.      Note we want this to be inside a
678          * transaction, so xact.c doesn't issue useless WARNING.
679          */
680         CHECK_FOR_INTERRUPTS();
681
682         /*
683          * Race condition -- if the pg_class tuple has gone away since the
684          * last time we saw it, we don't need to vacuum it.
685          */
686         if (!SearchSysCacheExists(RELOID,
687                                                           ObjectIdGetDatum(relid),
688                                                           0, 0, 0))
689         {
690                 CommitTransactionCommand();
691                 return;
692         }
693
694         /*
695          * Determine the type of lock we want --- hard exclusive lock for a
696          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
697          * vacuum.      Either way, we can be sure that no other backend is
698          * vacuuming the same table.
699          */
700         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
701
702         /*
703          * Open the class, get an appropriate lock on it, and check
704          * permissions.
705          *
706          * We allow the user to vacuum a table if he is superuser, the table
707          * owner, or the database owner (but in the latter case, only if it's
708          * not a shared relation).      pg_class_ownercheck includes the superuser case.
709          *
710          * Note we choose to treat permissions failure as a WARNING and keep
711          * trying to vacuum the rest of the DB --- is this appropriate?
712          */
713         onerel = heap_open(relid, lmode);
714
715         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
716                   (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
717         {
718                 elog(WARNING, "Skipping \"%s\" --- only table or database owner can VACUUM it",
719                          RelationGetRelationName(onerel));
720                 heap_close(onerel, lmode);
721                 CommitTransactionCommand();
722                 return;
723         }
724
725         /*
726          * Get a session-level lock too. This will protect our access to the
727          * relation across multiple transactions, so that we can vacuum the
728          * relation's TOAST table (if any) secure in the knowledge that no one
729          * is deleting the parent relation.
730          *
731          * NOTE: this cannot block, even if someone else is waiting for access,
732          * because the lock manager knows that both lock requests are from the
733          * same process.
734          */
735         onerelid = onerel->rd_lockInfo.lockRelId;
736         LockRelationForSession(&onerelid, lmode);
737
738         /*
739          * Remember the relation's TOAST relation for later
740          */
741         toast_relid = onerel->rd_rel->reltoastrelid;
742
743         /*
744          * Do the actual work --- either FULL or "lazy" vacuum
745          */
746         if (vacstmt->full)
747                 full_vacuum_rel(onerel, vacstmt);
748         else
749                 lazy_vacuum_rel(onerel, vacstmt);
750
751         /* all done with this class, but hold lock until commit */
752         heap_close(onerel, NoLock);
753
754         /*
755          * Complete the transaction and free all temporary memory used.
756          */
757         CommitTransactionCommand();
758
759         /*
760          * If the relation has a secondary toast rel, vacuum that too while we
761          * still hold the session lock on the master table.  Note however that
762          * "analyze" will not get done on the toast table.      This is good,
763          * because the toaster always uses hardcoded index access and
764          * statistics are totally unimportant for toast relations.
765          */
766         if (toast_relid != InvalidOid)
767                 vacuum_rel(toast_relid, vacstmt);
768
769         /*
770          * Now release the session-level lock on the master table.
771          */
772         UnlockRelationForSession(&onerelid, lmode);
773 }
774
775
776 /****************************************************************************
777  *                                                                                                                                                      *
778  *                      Code for VACUUM FULL (only)                                                                             *
779  *                                                                                                                                                      *
780  ****************************************************************************
781  */
782
783
784 /*
785  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
786  *
787  *              This routine vacuums a single heap, cleans out its indexes, and
788  *              updates its num_pages and num_tuples statistics.
789  *
790  *              At entry, we have already established a transaction and opened
791  *              and locked the relation.
792  */
793 static void
794 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
795 {
796         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
797                                                                                  * clean indexes */
798         VacPageListData fraged_pages;           /* List of pages with space enough
799                                                                                  * for re-using */
800         Relation   *Irel;
801         int                     nindexes,
802                                 i;
803         VRelStats  *vacrelstats;
804         bool            reindex = false;
805
806         if (IsIgnoringSystemIndexes() &&
807                 IsSystemRelationName(RelationGetRelationName(onerel)))
808                 reindex = true;
809
810         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
811                                                   &OldestXmin, &FreezeLimit);
812
813         /*
814          * Set up statistics-gathering machinery.
815          */
816         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
817         vacrelstats->rel_pages = 0;
818         vacrelstats->rel_tuples = 0;
819         vacrelstats->hasindex = false;
820
821         /* scan the heap */
822         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
823         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
824
825         /* Now open all indexes of the relation */
826         vac_open_indexes(onerel, &nindexes, &Irel);
827         if (!Irel)
828                 reindex = false;
829         else if (!RelationGetForm(onerel)->relhasindex)
830                 reindex = true;
831         if (nindexes > 0)
832                 vacrelstats->hasindex = true;
833
834 #ifdef NOT_USED
835
836         /*
837          * reindex in VACUUM is dangerous under WAL. ifdef out until it
838          * becomes safe.
839          */
840         if (reindex)
841         {
842                 vac_close_indexes(nindexes, Irel);
843                 Irel = (Relation *) NULL;
844                 activate_indexes_of_a_table(RelationGetRelid(onerel), false);
845         }
846 #endif   /* NOT_USED */
847
848         /* Clean/scan index relation(s) */
849         if (Irel != (Relation *) NULL)
850         {
851                 if (vacuum_pages.num_pages > 0)
852                 {
853                         for (i = 0; i < nindexes; i++)
854                                 vacuum_index(&vacuum_pages, Irel[i],
855                                                          vacrelstats->rel_tuples, 0);
856                 }
857                 else
858                 {
859                         /* just scan indexes to update statistic */
860                         for (i = 0; i < nindexes; i++)
861                                 scan_index(Irel[i], vacrelstats->rel_tuples);
862                 }
863         }
864
865         if (fraged_pages.num_pages > 0)
866         {
867                 /* Try to shrink heap */
868                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
869                                         nindexes, Irel);
870                 vac_close_indexes(nindexes, Irel);
871         }
872         else
873         {
874                 vac_close_indexes(nindexes, Irel);
875                 if (vacuum_pages.num_pages > 0)
876                 {
877                         /* Clean pages from vacuum_pages list */
878                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
879                 }
880                 else
881                 {
882                         /*
883                          * Flush dirty pages out to disk.  We must do this even if we
884                          * didn't do anything else, because we want to ensure that all
885                          * tuples have correct on-row commit status on disk (see
886                          * bufmgr.c's comments for FlushRelationBuffers()).
887                          */
888                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
889                         if (i < 0)
890                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
891                                          i);
892                 }
893         }
894
895 #ifdef NOT_USED
896         if (reindex)
897                 activate_indexes_of_a_table(RelationGetRelid(onerel), true);
898 #endif   /* NOT_USED */
899
900         /* update shared free space map with final free space info */
901         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
902
903         /* update statistics in pg_class */
904         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
905                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
906 }
907
908
909 /*
910  *      scan_heap() -- scan an open heap relation
911  *
912  *              This routine sets commit status bits, constructs vacuum_pages (list
913  *              of pages we need to compact free space on and/or clean indexes of
914  *              deleted tuples), constructs fraged_pages (list of pages with free
915  *              space that tuples could be moved into), and calculates statistics
916  *              on the number of live tuples in the heap.
917  */
918 static void
919 scan_heap(VRelStats *vacrelstats, Relation onerel,
920                   VacPageList vacuum_pages, VacPageList fraged_pages)
921 {
922         BlockNumber nblocks,
923                                 blkno;
924         ItemId          itemid;
925         Buffer          buf;
926         HeapTupleData tuple;
927         OffsetNumber offnum,
928                                 maxoff;
929         bool            pgchanged,
930                                 tupgone,
931                                 notup;
932         char       *relname;
933         VacPage         vacpage,
934                                 vacpagecopy;
935         BlockNumber empty_pages,
936                                 new_pages,
937                                 changed_pages,
938                                 empty_end_pages;
939         double          num_tuples,
940                                 tups_vacuumed,
941                                 nkeep,
942                                 nunused;
943         double          free_size,
944                                 usable_free_size;
945         Size            min_tlen = MaxTupleSize;
946         Size            max_tlen = 0;
947         int                     i;
948         bool            do_shrinking = true;
949         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
950         int                     num_vtlinks = 0;
951         int                     free_vtlinks = 100;
952         VacRUsage       ru0;
953
954         vac_init_rusage(&ru0);
955
956         relname = RelationGetRelationName(onerel);
957         elog(elevel, "--Relation %s--", relname);
958
959         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
960         num_tuples = tups_vacuumed = nkeep = nunused = 0;
961         free_size = 0;
962
963         nblocks = RelationGetNumberOfBlocks(onerel);
964
965         /*
966          * We initially create each VacPage item in a maximal-sized workspace,
967          * then copy the workspace into a just-large-enough copy.
968          */
969         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
970
971         for (blkno = 0; blkno < nblocks; blkno++)
972         {
973                 Page            page,
974                                         tempPage = NULL;
975                 bool            do_reap,
976                                         do_frag;
977
978                 CHECK_FOR_INTERRUPTS();
979
980                 buf = ReadBuffer(onerel, blkno);
981                 page = BufferGetPage(buf);
982
983                 vacpage->blkno = blkno;
984                 vacpage->offsets_used = 0;
985                 vacpage->offsets_free = 0;
986
987                 if (PageIsNew(page))
988                 {
989                         elog(WARNING, "Rel %s: Uninitialized page %u - fixing",
990                                  relname, blkno);
991                         PageInit(page, BufferGetPageSize(buf), 0);
992                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
993                         free_size += (vacpage->free - sizeof(ItemIdData));
994                         new_pages++;
995                         empty_end_pages++;
996                         vacpagecopy = copy_vac_page(vacpage);
997                         vpage_insert(vacuum_pages, vacpagecopy);
998                         vpage_insert(fraged_pages, vacpagecopy);
999                         WriteBuffer(buf);
1000                         continue;
1001                 }
1002
1003                 if (PageIsEmpty(page))
1004                 {
1005                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1006                         free_size += (vacpage->free - sizeof(ItemIdData));
1007                         empty_pages++;
1008                         empty_end_pages++;
1009                         vacpagecopy = copy_vac_page(vacpage);
1010                         vpage_insert(vacuum_pages, vacpagecopy);
1011                         vpage_insert(fraged_pages, vacpagecopy);
1012                         ReleaseBuffer(buf);
1013                         continue;
1014                 }
1015
1016                 pgchanged = false;
1017                 notup = true;
1018                 maxoff = PageGetMaxOffsetNumber(page);
1019                 for (offnum = FirstOffsetNumber;
1020                          offnum <= maxoff;
1021                          offnum = OffsetNumberNext(offnum))
1022                 {
1023                         uint16          sv_infomask;
1024
1025                         itemid = PageGetItemId(page, offnum);
1026
1027                         /*
1028                          * Collect un-used items too - it's possible to have indexes
1029                          * pointing here after crash.
1030                          */
1031                         if (!ItemIdIsUsed(itemid))
1032                         {
1033                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1034                                 nunused += 1;
1035                                 continue;
1036                         }
1037
1038                         tuple.t_datamcxt = NULL;
1039                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1040                         tuple.t_len = ItemIdGetLength(itemid);
1041                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1042
1043                         tupgone = false;
1044                         sv_infomask = tuple.t_data->t_infomask;
1045
1046                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1047                         {
1048                                 case HEAPTUPLE_DEAD:
1049                                         tupgone = true;         /* we can delete the tuple */
1050                                         break;
1051                                 case HEAPTUPLE_LIVE:
1052
1053                                         /*
1054                                          * Tuple is good.  Consider whether to replace its
1055                                          * xmin value with FrozenTransactionId.
1056                                          */
1057                                         if (TransactionIdIsNormal(tuple.t_data->t_xmin) &&
1058                                                 TransactionIdPrecedes(tuple.t_data->t_xmin,
1059                                                                                           FreezeLimit))
1060                                         {
1061                                                 tuple.t_data->t_xmin = FrozenTransactionId;
1062                                                 /* infomask should be okay already */
1063                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1064                                                 pgchanged = true;
1065                                         }
1066                                         break;
1067                                 case HEAPTUPLE_RECENTLY_DEAD:
1068
1069                                         /*
1070                                          * If tuple is recently deleted then we must not
1071                                          * remove it from relation.
1072                                          */
1073                                         nkeep += 1;
1074
1075                                         /*
1076                                          * If we do shrinking and this tuple is updated one
1077                                          * then remember it to construct updated tuple
1078                                          * dependencies.
1079                                          */
1080                                         if (do_shrinking &&
1081                                                 !(ItemPointerEquals(&(tuple.t_self),
1082                                                                                         &(tuple.t_data->t_ctid))))
1083                                         {
1084                                                 if (free_vtlinks == 0)
1085                                                 {
1086                                                         free_vtlinks = 1000;
1087                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1088                                                                                    (free_vtlinks + num_vtlinks) *
1089                                                                                                  sizeof(VTupleLinkData));
1090                                                 }
1091                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1092                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1093                                                 free_vtlinks--;
1094                                                 num_vtlinks++;
1095                                         }
1096                                         break;
1097                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1098
1099                                         /*
1100                                          * This should not happen, since we hold exclusive
1101                                          * lock on the relation; shouldn't we raise an error?
1102                                          */
1103                                         elog(WARNING, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
1104                                                  relname, blkno, offnum, tuple.t_data->t_xmin);
1105                                         do_shrinking = false;
1106                                         break;
1107                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1108
1109                                         /*
1110                                          * This should not happen, since we hold exclusive
1111                                          * lock on the relation; shouldn't we raise an error?
1112                                          */
1113                                         elog(WARNING, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
1114                                                  relname, blkno, offnum, tuple.t_data->t_xmax);
1115                                         do_shrinking = false;
1116                                         break;
1117                                 default:
1118                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
1119                                         break;
1120                         }
1121
1122                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1123                         if (sv_infomask != tuple.t_data->t_infomask)
1124                                 pgchanged = true;
1125
1126                         /*
1127                          * Other checks...
1128                          */
1129                         if (!OidIsValid(tuple.t_data->t_oid) &&
1130                                 onerel->rd_rel->relhasoids)
1131                                 elog(WARNING, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
1132                                          relname, blkno, offnum, (int) tupgone);
1133
1134                         if (tupgone)
1135                         {
1136                                 ItemId          lpp;
1137
1138                                 /*
1139                                  * Here we are building a temporary copy of the page with
1140                                  * dead tuples removed.  Below we will apply
1141                                  * PageRepairFragmentation to the copy, so that we can
1142                                  * determine how much space will be available after
1143                                  * removal of dead tuples.      But note we are NOT changing
1144                                  * the real page yet...
1145                                  */
1146                                 if (tempPage == (Page) NULL)
1147                                 {
1148                                         Size            pageSize;
1149
1150                                         pageSize = PageGetPageSize(page);
1151                                         tempPage = (Page) palloc(pageSize);
1152                                         memcpy(tempPage, page, pageSize);
1153                                 }
1154
1155                                 /* mark it unused on the temp page */
1156                                 lpp = PageGetItemId(tempPage, offnum);
1157                                 lpp->lp_flags &= ~LP_USED;
1158
1159                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1160                                 tups_vacuumed += 1;
1161                         }
1162                         else
1163                         {
1164                                 num_tuples += 1;
1165                                 notup = false;
1166                                 if (tuple.t_len < min_tlen)
1167                                         min_tlen = tuple.t_len;
1168                                 if (tuple.t_len > max_tlen)
1169                                         max_tlen = tuple.t_len;
1170                         }
1171                 }                                               /* scan along page */
1172
1173                 if (tempPage != (Page) NULL)
1174                 {
1175                         /* Some tuples are removable; figure free space after removal */
1176                         PageRepairFragmentation(tempPage, NULL);
1177                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1178                         pfree(tempPage);
1179                         do_reap = true;
1180                 }
1181                 else
1182                 {
1183                         /* Just use current available space */
1184                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1185                         /* Need to reap the page if it has ~LP_USED line pointers */
1186                         do_reap = (vacpage->offsets_free > 0);
1187                 }
1188
1189                 free_size += vacpage->free;
1190
1191                 /*
1192                  * Add the page to fraged_pages if it has a useful amount of free
1193                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1194                  * don't know that accurately near the start of the relation, so
1195                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1196                  */
1197                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1198
1199                 if (do_reap || do_frag)
1200                 {
1201                         vacpagecopy = copy_vac_page(vacpage);
1202                         if (do_reap)
1203                                 vpage_insert(vacuum_pages, vacpagecopy);
1204                         if (do_frag)
1205                                 vpage_insert(fraged_pages, vacpagecopy);
1206                 }
1207
1208                 if (notup)
1209                         empty_end_pages++;
1210                 else
1211                         empty_end_pages = 0;
1212
1213                 if (pgchanged)
1214                 {
1215                         WriteBuffer(buf);
1216                         changed_pages++;
1217                 }
1218                 else
1219                         ReleaseBuffer(buf);
1220         }
1221
1222         pfree(vacpage);
1223
1224         /* save stats in the rel list for use later */
1225         vacrelstats->rel_tuples = num_tuples;
1226         vacrelstats->rel_pages = nblocks;
1227         if (num_tuples == 0)
1228                 min_tlen = max_tlen = 0;
1229         vacrelstats->min_tlen = min_tlen;
1230         vacrelstats->max_tlen = max_tlen;
1231
1232         vacuum_pages->empty_end_pages = empty_end_pages;
1233         fraged_pages->empty_end_pages = empty_end_pages;
1234
1235         /*
1236          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1237          * remove any "empty" end-pages from the list, and compute usable free
1238          * space = free space in remaining pages.
1239          */
1240         if (do_shrinking)
1241         {
1242                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1243                 fraged_pages->num_pages -= empty_end_pages;
1244                 usable_free_size = 0;
1245                 for (i = 0; i < fraged_pages->num_pages; i++)
1246                         usable_free_size += fraged_pages->pagedesc[i]->free;
1247         }
1248         else
1249         {
1250                 fraged_pages->num_pages = 0;
1251                 usable_free_size = 0;
1252         }
1253
1254         if (usable_free_size > 0 && num_vtlinks > 0)
1255         {
1256                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1257                           vac_cmp_vtlinks);
1258                 vacrelstats->vtlinks = vtlinks;
1259                 vacrelstats->num_vtlinks = num_vtlinks;
1260         }
1261         else
1262         {
1263                 vacrelstats->vtlinks = NULL;
1264                 vacrelstats->num_vtlinks = 0;
1265                 pfree(vtlinks);
1266         }
1267
1268         elog(elevel, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
1269 Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
1270 Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u.\n\t%s",
1271                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1272                  new_pages, num_tuples, tups_vacuumed,
1273                  nkeep, vacrelstats->num_vtlinks,
1274                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1275                  free_size, usable_free_size,
1276                  empty_end_pages, fraged_pages->num_pages,
1277                  vac_show_rusage(&ru0));
1278
1279 }
1280
1281
1282 /*
1283  *      repair_frag() -- try to repair relation's fragmentation
1284  *
1285  *              This routine marks dead tuples as unused and tries re-use dead space
1286  *              by moving tuples (and inserting indexes if needed). It constructs
1287  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1288  *              for them after committing (in hack-manner - without losing locks
1289  *              and freeing memory!) current transaction. It truncates relation
1290  *              if some end-blocks are gone away.
1291  */
1292 static void
1293 repair_frag(VRelStats *vacrelstats, Relation onerel,
1294                         VacPageList vacuum_pages, VacPageList fraged_pages,
1295                         int nindexes, Relation *Irel)
1296 {
1297         TransactionId myXID;
1298         CommandId       myCID;
1299         Buffer          buf,
1300                                 cur_buffer;
1301         BlockNumber nblocks,
1302                                 blkno;
1303         BlockNumber last_move_dest_block = 0,
1304                                 last_vacuum_block;
1305         Page            page,
1306                                 ToPage = NULL;
1307         OffsetNumber offnum,
1308                                 maxoff,
1309                                 newoff,
1310                                 max_offset;
1311         ItemId          itemid,
1312                                 newitemid;
1313         HeapTupleData tuple,
1314                                 newtup;
1315         TupleDesc       tupdesc;
1316         ResultRelInfo *resultRelInfo;
1317         EState     *estate;
1318         TupleTable      tupleTable;
1319         TupleTableSlot *slot;
1320         VacPageListData Nvacpagelist;
1321         VacPage         cur_page = NULL,
1322                                 last_vacuum_page,
1323                                 vacpage,
1324                            *curpage;
1325         int                     cur_item = 0;
1326         int                     i;
1327         Size            tuple_len;
1328         int                     num_moved,
1329                                 num_fraged_pages,
1330                                 vacuumed_pages;
1331         int                     checked_moved,
1332                                 num_tuples,
1333                                 keep_tuples = 0;
1334         bool            isempty,
1335                                 dowrite,
1336                                 chain_tuple_moved;
1337         VacRUsage       ru0;
1338
1339         vac_init_rusage(&ru0);
1340
1341         myXID = GetCurrentTransactionId();
1342         myCID = GetCurrentCommandId();
1343
1344         tupdesc = RelationGetDescr(onerel);
1345
1346         /*
1347          * We need a ResultRelInfo and an EState so we can use the regular
1348          * executor's index-entry-making machinery.
1349          */
1350         resultRelInfo = makeNode(ResultRelInfo);
1351         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1352         resultRelInfo->ri_RelationDesc = onerel;
1353         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1354
1355         ExecOpenIndices(resultRelInfo);
1356
1357         estate = CreateExecutorState();
1358         estate->es_result_relations = resultRelInfo;
1359         estate->es_num_result_relations = 1;
1360         estate->es_result_relation_info = resultRelInfo;
1361
1362         /* Set up a dummy tuple table too */
1363         tupleTable = ExecCreateTupleTable(1);
1364         slot = ExecAllocTableSlot(tupleTable);
1365         ExecSetSlotDescriptor(slot, tupdesc, false);
1366
1367         Nvacpagelist.num_pages = 0;
1368         num_fraged_pages = fraged_pages->num_pages;
1369         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1370         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1371         if (vacuumed_pages > 0)
1372         {
1373                 /* get last reaped page from vacuum_pages */
1374                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1375                 last_vacuum_block = last_vacuum_page->blkno;
1376         }
1377         else
1378         {
1379                 last_vacuum_page = NULL;
1380                 last_vacuum_block = InvalidBlockNumber;
1381         }
1382         cur_buffer = InvalidBuffer;
1383         num_moved = 0;
1384
1385         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1386         vacpage->offsets_used = vacpage->offsets_free = 0;
1387
1388         /*
1389          * Scan pages backwards from the last nonempty page, trying to move
1390          * tuples down to lower pages.  Quit when we reach a page that we have
1391          * moved any tuples onto, or the first page if we haven't moved
1392          * anything, or when we find a page we cannot completely empty (this
1393          * last condition is handled by "break" statements within the loop).
1394          *
1395          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1396          * in order by blkno.
1397          */
1398         nblocks = vacrelstats->rel_pages;
1399         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1400                  blkno > last_move_dest_block;
1401                  blkno--)
1402         {
1403                 CHECK_FOR_INTERRUPTS();
1404
1405                 /*
1406                  * Forget fraged_pages pages at or after this one; they're no
1407                  * longer useful as move targets, since we only want to move down.
1408                  * Note that since we stop the outer loop at last_move_dest_block,
1409                  * pages removed here cannot have had anything moved onto them
1410                  * already.
1411                  *
1412                  * Also note that we don't change the stored fraged_pages list, only
1413                  * our local variable num_fraged_pages; so the forgotten pages are
1414                  * still available to be loaded into the free space map later.
1415                  */
1416                 while (num_fraged_pages > 0 &&
1417                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1418                 {
1419                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1420                         --num_fraged_pages;
1421                 }
1422
1423                 /*
1424                  * Process this page of relation.
1425                  */
1426                 buf = ReadBuffer(onerel, blkno);
1427                 page = BufferGetPage(buf);
1428
1429                 vacpage->offsets_free = 0;
1430
1431                 isempty = PageIsEmpty(page);
1432
1433                 dowrite = false;
1434
1435                 /* Is the page in the vacuum_pages list? */
1436                 if (blkno == last_vacuum_block)
1437                 {
1438                         if (last_vacuum_page->offsets_free > 0)
1439                         {
1440                                 /* there are dead tuples on this page - clean them */
1441                                 Assert(!isempty);
1442                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1443                                 vacuum_page(onerel, buf, last_vacuum_page);
1444                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1445                                 dowrite = true;
1446                         }
1447                         else
1448                                 Assert(isempty);
1449                         --vacuumed_pages;
1450                         if (vacuumed_pages > 0)
1451                         {
1452                                 /* get prev reaped page from vacuum_pages */
1453                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1454                                 last_vacuum_block = last_vacuum_page->blkno;
1455                         }
1456                         else
1457                         {
1458                                 last_vacuum_page = NULL;
1459                                 last_vacuum_block = InvalidBlockNumber;
1460                         }
1461                         if (isempty)
1462                         {
1463                                 ReleaseBuffer(buf);
1464                                 continue;
1465                         }
1466                 }
1467                 else
1468                         Assert(!isempty);
1469
1470                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1471                                                                                  * off this page, yet */
1472                 vacpage->blkno = blkno;
1473                 maxoff = PageGetMaxOffsetNumber(page);
1474                 for (offnum = FirstOffsetNumber;
1475                          offnum <= maxoff;
1476                          offnum = OffsetNumberNext(offnum))
1477                 {
1478                         itemid = PageGetItemId(page, offnum);
1479
1480                         if (!ItemIdIsUsed(itemid))
1481                                 continue;
1482
1483                         tuple.t_datamcxt = NULL;
1484                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1485                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1486                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1487
1488                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1489                         {
1490                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1491                                         elog(ERROR, "Invalid XID in t_cmin");
1492                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1493                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1494
1495                                 /*
1496                                  * If this (chain) tuple is moved by me already then I
1497                                  * have to check is it in vacpage or not - i.e. is it
1498                                  * moved while cleaning this page or some previous one.
1499                                  */
1500                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1501                                 {
1502                                         if (keep_tuples == 0)
1503                                                 continue;
1504                                         if (chain_tuple_moved)          /* some chains was moved
1505                                                                                                  * while */
1506                                         {                       /* cleaning this page */
1507                                                 Assert(vacpage->offsets_free > 0);
1508                                                 for (i = 0; i < vacpage->offsets_free; i++)
1509                                                 {
1510                                                         if (vacpage->offsets[i] == offnum)
1511                                                                 break;
1512                                                 }
1513                                                 if (i >= vacpage->offsets_free) /* not found */
1514                                                 {
1515                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1516                                                         keep_tuples--;
1517                                                 }
1518                                         }
1519                                         else
1520                                         {
1521                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1522                                                 keep_tuples--;
1523                                         }
1524                                         continue;
1525                                 }
1526                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1527                         }
1528
1529                         /*
1530                          * If this tuple is in the chain of tuples created in updates
1531                          * by "recent" transactions then we have to move all chain of
1532                          * tuples to another places.
1533                          */
1534                         if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
1535                          !TransactionIdPrecedes(tuple.t_data->t_xmin, OldestXmin)) ||
1536                                 (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1537                                  !(ItemPointerEquals(&(tuple.t_self),
1538                                                                          &(tuple.t_data->t_ctid)))))
1539                         {
1540                                 Buffer          Cbuf = buf;
1541                                 Page            Cpage;
1542                                 ItemId          Citemid;
1543                                 ItemPointerData Ctid;
1544                                 HeapTupleData tp = tuple;
1545                                 Size            tlen = tuple_len;
1546                                 VTupleMove      vtmove = (VTupleMove)
1547                                 palloc(100 * sizeof(VTupleMoveData));
1548                                 int                     num_vtmove = 0;
1549                                 int                     free_vtmove = 100;
1550                                 VacPage         to_vacpage = NULL;
1551                                 int                     to_item = 0;
1552                                 bool            freeCbuf = false;
1553                                 int                     ti;
1554
1555                                 if (vacrelstats->vtlinks == NULL)
1556                                         elog(ERROR, "No one parent tuple was found");
1557                                 if (cur_buffer != InvalidBuffer)
1558                                 {
1559                                         WriteBuffer(cur_buffer);
1560                                         cur_buffer = InvalidBuffer;
1561                                 }
1562
1563                                 /*
1564                                  * If this tuple is in the begin/middle of the chain then
1565                                  * we have to move to the end of chain.
1566                                  */
1567                                 while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1568                                            !(ItemPointerEquals(&(tp.t_self),
1569                                                                                    &(tp.t_data->t_ctid))))
1570                                 {
1571                                         Ctid = tp.t_data->t_ctid;
1572                                         if (freeCbuf)
1573                                                 ReleaseBuffer(Cbuf);
1574                                         freeCbuf = true;
1575                                         Cbuf = ReadBuffer(onerel,
1576                                                                           ItemPointerGetBlockNumber(&Ctid));
1577                                         Cpage = BufferGetPage(Cbuf);
1578                                         Citemid = PageGetItemId(Cpage,
1579                                                                           ItemPointerGetOffsetNumber(&Ctid));
1580                                         if (!ItemIdIsUsed(Citemid))
1581                                         {
1582                                                 /*
1583                                                  * This means that in the middle of chain there
1584                                                  * was tuple updated by older (than OldestXmin)
1585                                                  * xaction and this tuple is already deleted by
1586                                                  * me. Actually, upper part of chain should be
1587                                                  * removed and seems that this should be handled
1588                                                  * in scan_heap(), but it's not implemented at the
1589                                                  * moment and so we just stop shrinking here.
1590                                                  */
1591                                                 ReleaseBuffer(Cbuf);
1592                                                 pfree(vtmove);
1593                                                 vtmove = NULL;
1594                                                 elog(WARNING, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1595                                                 break;
1596                                         }
1597                                         tp.t_datamcxt = NULL;
1598                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1599                                         tp.t_self = Ctid;
1600                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1601                                 }
1602                                 if (vtmove == NULL)
1603                                         break;
1604                                 /* first, can chain be moved ? */
1605                                 for (;;)
1606                                 {
1607                                         if (to_vacpage == NULL ||
1608                                                 !enough_space(to_vacpage, tlen))
1609                                         {
1610                                                 for (i = 0; i < num_fraged_pages; i++)
1611                                                 {
1612                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1613                                                                 break;
1614                                                 }
1615
1616                                                 if (i == num_fraged_pages)
1617                                                 {
1618                                                         /* can't move item anywhere */
1619                                                         for (i = 0; i < num_vtmove; i++)
1620                                                         {
1621                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1622                                                                 (vtmove[i].vacpage->offsets_used)--;
1623                                                         }
1624                                                         num_vtmove = 0;
1625                                                         break;
1626                                                 }
1627                                                 to_item = i;
1628                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1629                                         }
1630                                         to_vacpage->free -= MAXALIGN(tlen);
1631                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1632                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1633                                         (to_vacpage->offsets_used)++;
1634                                         if (free_vtmove == 0)
1635                                         {
1636                                                 free_vtmove = 1000;
1637                                                 vtmove = (VTupleMove) repalloc(vtmove,
1638                                                                                          (free_vtmove + num_vtmove) *
1639                                                                                                  sizeof(VTupleMoveData));
1640                                         }
1641                                         vtmove[num_vtmove].tid = tp.t_self;
1642                                         vtmove[num_vtmove].vacpage = to_vacpage;
1643                                         if (to_vacpage->offsets_used == 1)
1644                                                 vtmove[num_vtmove].cleanVpd = true;
1645                                         else
1646                                                 vtmove[num_vtmove].cleanVpd = false;
1647                                         free_vtmove--;
1648                                         num_vtmove++;
1649
1650                                         /* All done ? */
1651                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1652                                         TransactionIdPrecedes(tp.t_data->t_xmin, OldestXmin))
1653                                                 break;
1654
1655                                         /* Well, try to find tuple with old row version */
1656                                         for (;;)
1657                                         {
1658                                                 Buffer          Pbuf;
1659                                                 Page            Ppage;
1660                                                 ItemId          Pitemid;
1661                                                 HeapTupleData Ptp;
1662                                                 VTupleLinkData vtld,
1663                                                                    *vtlp;
1664
1665                                                 vtld.new_tid = tp.t_self;
1666                                                 vtlp = (VTupleLink)
1667                                                         vac_bsearch((void *) &vtld,
1668                                                                                 (void *) (vacrelstats->vtlinks),
1669                                                                                 vacrelstats->num_vtlinks,
1670                                                                                 sizeof(VTupleLinkData),
1671                                                                                 vac_cmp_vtlinks);
1672                                                 if (vtlp == NULL)
1673                                                         elog(ERROR, "Parent tuple was not found");
1674                                                 tp.t_self = vtlp->this_tid;
1675                                                 Pbuf = ReadBuffer(onerel,
1676                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1677                                                 Ppage = BufferGetPage(Pbuf);
1678                                                 Pitemid = PageGetItemId(Ppage,
1679                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1680                                                 if (!ItemIdIsUsed(Pitemid))
1681                                                         elog(ERROR, "Parent itemid marked as unused");
1682                                                 Ptp.t_datamcxt = NULL;
1683                                                 Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1684                                                 Assert(ItemPointerEquals(&(vtld.new_tid),
1685                                                                                                  &(Ptp.t_data->t_ctid)));
1686
1687                                                 /*
1688                                                  * Read above about cases when
1689                                                  * !ItemIdIsUsed(Citemid) (child item is
1690                                                  * removed)... Due to the fact that at the moment
1691                                                  * we don't remove unuseful part of update-chain,
1692                                                  * it's possible to get too old parent row here.
1693                                                  * Like as in the case which caused this problem,
1694                                                  * we stop shrinking here. I could try to find
1695                                                  * real parent row but want not to do it because
1696                                                  * of real solution will be implemented anyway,
1697                                                  * latter, and we are too close to 6.5 release. -
1698                                                  * vadim 06/11/99
1699                                                  */
1700                                                 if (!(TransactionIdEquals(Ptp.t_data->t_xmax,
1701                                                                                                   tp.t_data->t_xmin)))
1702                                                 {
1703                                                         if (freeCbuf)
1704                                                                 ReleaseBuffer(Cbuf);
1705                                                         freeCbuf = false;
1706                                                         ReleaseBuffer(Pbuf);
1707                                                         for (i = 0; i < num_vtmove; i++)
1708                                                         {
1709                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1710                                                                 (vtmove[i].vacpage->offsets_used)--;
1711                                                         }
1712                                                         num_vtmove = 0;
1713                                                         elog(WARNING, "Too old parent tuple found - can't continue repair_frag");
1714                                                         break;
1715                                                 }
1716 #ifdef NOT_USED                                 /* I'm not sure that this will wotk
1717                                                                  * properly... */
1718
1719                                                 /*
1720                                                  * If this tuple is updated version of row and it
1721                                                  * was created by the same transaction then no one
1722                                                  * is interested in this tuple - mark it as
1723                                                  * removed.
1724                                                  */
1725                                                 if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
1726                                                         TransactionIdEquals(Ptp.t_data->t_xmin,
1727                                                                                                 Ptp.t_data->t_xmax))
1728                                                 {
1729                                                         TransactionIdStore(myXID,
1730                                                                 (TransactionId *) &(Ptp.t_data->t_cmin));
1731                                                         Ptp.t_data->t_infomask &=
1732                                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1733                                                         Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
1734                                                         WriteBuffer(Pbuf);
1735                                                         continue;
1736                                                 }
1737 #endif
1738                                                 tp.t_datamcxt = Ptp.t_datamcxt;
1739                                                 tp.t_data = Ptp.t_data;
1740                                                 tlen = tp.t_len = ItemIdGetLength(Pitemid);
1741                                                 if (freeCbuf)
1742                                                         ReleaseBuffer(Cbuf);
1743                                                 Cbuf = Pbuf;
1744                                                 freeCbuf = true;
1745                                                 break;
1746                                         }
1747                                         if (num_vtmove == 0)
1748                                                 break;
1749                                 }
1750                                 if (freeCbuf)
1751                                         ReleaseBuffer(Cbuf);
1752                                 if (num_vtmove == 0)    /* chain can't be moved */
1753                                 {
1754                                         pfree(vtmove);
1755                                         break;
1756                                 }
1757                                 ItemPointerSetInvalid(&Ctid);
1758                                 for (ti = 0; ti < num_vtmove; ti++)
1759                                 {
1760                                         VacPage         destvacpage = vtmove[ti].vacpage;
1761
1762                                         /* Get page to move from */
1763                                         tuple.t_self = vtmove[ti].tid;
1764                                         Cbuf = ReadBuffer(onerel,
1765                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1766
1767                                         /* Get page to move to */
1768                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1769
1770                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1771                                         if (cur_buffer != Cbuf)
1772                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1773
1774                                         ToPage = BufferGetPage(cur_buffer);
1775                                         Cpage = BufferGetPage(Cbuf);
1776
1777                                         Citemid = PageGetItemId(Cpage,
1778                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1779                                         tuple.t_datamcxt = NULL;
1780                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1781                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1782
1783                                         /*
1784                                          * make a copy of the source tuple, and then mark the
1785                                          * source tuple MOVED_OFF.
1786                                          */
1787                                         heap_copytuple_with_tuple(&tuple, &newtup);
1788
1789                                         /*
1790                                          * register invalidation of source tuple in catcaches.
1791                                          */
1792                                         CacheInvalidateHeapTuple(onerel, &tuple);
1793
1794                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1795                                         START_CRIT_SECTION();
1796
1797                                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1798                                         tuple.t_data->t_infomask &=
1799                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1800                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1801
1802                                         /*
1803                                          * If this page was not used before - clean it.
1804                                          *
1805                                          * NOTE: a nasty bug used to lurk here.  It is possible
1806                                          * for the source and destination pages to be the same
1807                                          * (since this tuple-chain member can be on a page
1808                                          * lower than the one we're currently processing in
1809                                          * the outer loop).  If that's true, then after
1810                                          * vacuum_page() the source tuple will have been
1811                                          * moved, and tuple.t_data will be pointing at
1812                                          * garbage.  Therefore we must do everything that uses
1813                                          * tuple.t_data BEFORE this step!!
1814                                          *
1815                                          * This path is different from the other callers of
1816                                          * vacuum_page, because we have already incremented
1817                                          * the vacpage's offsets_used field to account for the
1818                                          * tuple(s) we expect to move onto the page. Therefore
1819                                          * vacuum_page's check for offsets_used == 0 is wrong.
1820                                          * But since that's a good debugging check for all
1821                                          * other callers, we work around it here rather than
1822                                          * remove it.
1823                                          */
1824                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1825                                         {
1826                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1827
1828                                                 destvacpage->offsets_used = 0;
1829                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1830                                                 destvacpage->offsets_used = sv_offsets_used;
1831                                         }
1832
1833                                         /*
1834                                          * Update the state of the copied tuple, and store it
1835                                          * on the destination page.
1836                                          */
1837                                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1838                                         newtup.t_data->t_infomask &=
1839                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1840                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1841                                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1842                                                                                  InvalidOffsetNumber, LP_USED);
1843                                         if (newoff == InvalidOffsetNumber)
1844                                         {
1845                                                 elog(PANIC, "moving chain: failed to add item with len = %lu to page %u",
1846                                                   (unsigned long) tuple_len, destvacpage->blkno);
1847                                         }
1848                                         newitemid = PageGetItemId(ToPage, newoff);
1849                                         pfree(newtup.t_data);
1850                                         newtup.t_datamcxt = NULL;
1851                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1852                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1853
1854                                         {
1855                                                 XLogRecPtr      recptr =
1856                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1857                                                                           cur_buffer, &newtup);
1858
1859                                                 if (Cbuf != cur_buffer)
1860                                                 {
1861                                                         PageSetLSN(Cpage, recptr);
1862                                                         PageSetSUI(Cpage, ThisStartUpID);
1863                                                 }
1864                                                 PageSetLSN(ToPage, recptr);
1865                                                 PageSetSUI(ToPage, ThisStartUpID);
1866                                         }
1867                                         END_CRIT_SECTION();
1868
1869                                         if (destvacpage->blkno > last_move_dest_block)
1870                                                 last_move_dest_block = destvacpage->blkno;
1871
1872                                         /*
1873                                          * Set new tuple's t_ctid pointing to itself for last
1874                                          * tuple in chain, and to next tuple in chain
1875                                          * otherwise.
1876                                          */
1877                                         if (!ItemPointerIsValid(&Ctid))
1878                                                 newtup.t_data->t_ctid = newtup.t_self;
1879                                         else
1880                                                 newtup.t_data->t_ctid = Ctid;
1881                                         Ctid = newtup.t_self;
1882
1883                                         num_moved++;
1884
1885                                         /*
1886                                          * Remember that we moved tuple from the current page
1887                                          * (corresponding index tuple will be cleaned).
1888                                          */
1889                                         if (Cbuf == buf)
1890                                                 vacpage->offsets[vacpage->offsets_free++] =
1891                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1892                                         else
1893                                                 keep_tuples++;
1894
1895                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1896                                         if (cur_buffer != Cbuf)
1897                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
1898
1899                                         /* Create index entries for the moved tuple */
1900                                         if (resultRelInfo->ri_NumIndices > 0)
1901                                         {
1902                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
1903                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
1904                                                                                           estate, true);
1905                                         }
1906
1907                                         WriteBuffer(cur_buffer);
1908                                         WriteBuffer(Cbuf);
1909                                 }
1910                                 cur_buffer = InvalidBuffer;
1911                                 pfree(vtmove);
1912                                 chain_tuple_moved = true;
1913                                 continue;
1914                         }
1915
1916                         /* try to find new page for this tuple */
1917                         if (cur_buffer == InvalidBuffer ||
1918                                 !enough_space(cur_page, tuple_len))
1919                         {
1920                                 if (cur_buffer != InvalidBuffer)
1921                                 {
1922                                         WriteBuffer(cur_buffer);
1923                                         cur_buffer = InvalidBuffer;
1924                                 }
1925                                 for (i = 0; i < num_fraged_pages; i++)
1926                                 {
1927                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
1928                                                 break;
1929                                 }
1930                                 if (i == num_fraged_pages)
1931                                         break;          /* can't move item anywhere */
1932                                 cur_item = i;
1933                                 cur_page = fraged_pages->pagedesc[cur_item];
1934                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
1935                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1936                                 ToPage = BufferGetPage(cur_buffer);
1937                                 /* if this page was not used before - clean it */
1938                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
1939                                         vacuum_page(onerel, cur_buffer, cur_page);
1940                         }
1941                         else
1942                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1943
1944                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1945
1946                         /* copy tuple */
1947                         heap_copytuple_with_tuple(&tuple, &newtup);
1948
1949                         /*
1950                          * register invalidation of source tuple in catcaches.
1951                          *
1952                          * (Note: we do not need to register the copied tuple,
1953                          * because we are not changing the tuple contents and
1954                          * so there cannot be any need to flush negative
1955                          * catcache entries.)
1956                          */
1957                         CacheInvalidateHeapTuple(onerel, &tuple);
1958
1959                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1960                         START_CRIT_SECTION();
1961
1962                         /*
1963                          * Mark new tuple as moved_in by vacuum and store vacuum XID
1964                          * in t_cmin !!!
1965                          */
1966                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1967                         newtup.t_data->t_infomask &=
1968                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1969                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1970
1971                         /* add tuple to the page */
1972                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1973                                                                  InvalidOffsetNumber, LP_USED);
1974                         if (newoff == InvalidOffsetNumber)
1975                         {
1976                                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
1977                                          (unsigned long) tuple_len,
1978                                          cur_page->blkno, (unsigned long) cur_page->free,
1979                                          cur_page->offsets_used, cur_page->offsets_free);
1980                         }
1981                         newitemid = PageGetItemId(ToPage, newoff);
1982                         pfree(newtup.t_data);
1983                         newtup.t_datamcxt = NULL;
1984                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1985                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
1986                         newtup.t_self = newtup.t_data->t_ctid;
1987
1988                         /*
1989                          * Mark old tuple as moved_off by vacuum and store vacuum XID
1990                          * in t_cmin !!!
1991                          */
1992                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1993                         tuple.t_data->t_infomask &=
1994                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1995                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1996
1997                         {
1998                                 XLogRecPtr      recptr =
1999                                 log_heap_move(onerel, buf, tuple.t_self,
2000                                                           cur_buffer, &newtup);
2001
2002                                 PageSetLSN(page, recptr);
2003                                 PageSetSUI(page, ThisStartUpID);
2004                                 PageSetLSN(ToPage, recptr);
2005                                 PageSetSUI(ToPage, ThisStartUpID);
2006                         }
2007                         END_CRIT_SECTION();
2008
2009                         cur_page->offsets_used++;
2010                         num_moved++;
2011                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
2012                         if (cur_page->blkno > last_move_dest_block)
2013                                 last_move_dest_block = cur_page->blkno;
2014
2015                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2016
2017                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2018                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2019
2020                         /* insert index' tuples if needed */
2021                         if (resultRelInfo->ri_NumIndices > 0)
2022                         {
2023                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2024                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
2025                         }
2026                 }                                               /* walk along page */
2027
2028                 if (offnum < maxoff && keep_tuples > 0)
2029                 {
2030                         OffsetNumber off;
2031
2032                         for (off = OffsetNumberNext(offnum);
2033                                  off <= maxoff;
2034                                  off = OffsetNumberNext(off))
2035                         {
2036                                 itemid = PageGetItemId(page, off);
2037                                 if (!ItemIdIsUsed(itemid))
2038                                         continue;
2039                                 tuple.t_datamcxt = NULL;
2040                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2041                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2042                                         continue;
2043                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
2044                                         elog(ERROR, "Invalid XID in t_cmin (4)");
2045                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2046                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
2047                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2048                                 {
2049                                         /* some chains was moved while */
2050                                         if (chain_tuple_moved)
2051                                         {                       /* cleaning this page */
2052                                                 Assert(vacpage->offsets_free > 0);
2053                                                 for (i = 0; i < vacpage->offsets_free; i++)
2054                                                 {
2055                                                         if (vacpage->offsets[i] == off)
2056                                                                 break;
2057                                                 }
2058                                                 if (i >= vacpage->offsets_free) /* not found */
2059                                                 {
2060                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2061                                                         Assert(keep_tuples > 0);
2062                                                         keep_tuples--;
2063                                                 }
2064                                         }
2065                                         else
2066                                         {
2067                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2068                                                 Assert(keep_tuples > 0);
2069                                                 keep_tuples--;
2070                                         }
2071                                 }
2072                         }
2073                 }
2074
2075                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2076                 {
2077                         if (chain_tuple_moved)          /* else - they are ordered */
2078                         {
2079                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2080                                           sizeof(OffsetNumber), vac_cmp_offno);
2081                         }
2082                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2083                         WriteBuffer(buf);
2084                 }
2085                 else if (dowrite)
2086                         WriteBuffer(buf);
2087                 else
2088                         ReleaseBuffer(buf);
2089
2090                 if (offnum <= maxoff)
2091                         break;                          /* some item(s) left */
2092
2093         }                                                       /* walk along relation */
2094
2095         blkno++;                                        /* new number of blocks */
2096
2097         if (cur_buffer != InvalidBuffer)
2098         {
2099                 Assert(num_moved > 0);
2100                 WriteBuffer(cur_buffer);
2101         }
2102
2103         if (num_moved > 0)
2104         {
2105                 /*
2106                  * We have to commit our tuple movings before we truncate the
2107                  * relation.  Ideally we should do Commit/StartTransactionCommand
2108                  * here, relying on the session-level table lock to protect our
2109                  * exclusive access to the relation.  However, that would require
2110                  * a lot of extra code to close and re-open the relation, indexes,
2111                  * etc.  For now, a quick hack: record status of current
2112                  * transaction as committed, and continue.
2113                  */
2114                 RecordTransactionCommit();
2115         }
2116
2117         /*
2118          * We are not going to move any more tuples across pages, but we still
2119          * need to apply vacuum_page to compact free space in the remaining
2120          * pages in vacuum_pages list.  Note that some of these pages may also
2121          * be in the fraged_pages list, and may have had tuples moved onto
2122          * them; if so, we already did vacuum_page and needn't do it again.
2123          */
2124         for (i = 0, curpage = vacuum_pages->pagedesc;
2125                  i < vacuumed_pages;
2126                  i++, curpage++)
2127         {
2128                 CHECK_FOR_INTERRUPTS();
2129                 Assert((*curpage)->blkno < blkno);
2130                 if ((*curpage)->offsets_used == 0)
2131                 {
2132                         /* this page was not used as a move target, so must clean it */
2133                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2134                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2135                         page = BufferGetPage(buf);
2136                         if (!PageIsEmpty(page))
2137                                 vacuum_page(onerel, buf, *curpage);
2138                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2139                         WriteBuffer(buf);
2140                 }
2141         }
2142
2143         /*
2144          * Now scan all the pages that we moved tuples onto and update tuple
2145          * status bits.  This is not really necessary, but will save time for
2146          * future transactions examining these tuples.
2147          *
2148          * XXX WARNING that this code fails to clear HEAP_MOVED_OFF tuples from
2149          * pages that were move source pages but not move dest pages.  One
2150          * also wonders whether it wouldn't be better to skip this step and
2151          * let the tuple status updates happen someplace that's not holding an
2152          * exclusive lock on the relation.
2153          */
2154         checked_moved = 0;
2155         for (i = 0, curpage = fraged_pages->pagedesc;
2156                  i < num_fraged_pages;
2157                  i++, curpage++)
2158         {
2159                 CHECK_FOR_INTERRUPTS();
2160                 Assert((*curpage)->blkno < blkno);
2161                 if ((*curpage)->blkno > last_move_dest_block)
2162                         break;                          /* no need to scan any further */
2163                 if ((*curpage)->offsets_used == 0)
2164                         continue;                       /* this page was never used as a move dest */
2165                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2166                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2167                 page = BufferGetPage(buf);
2168                 num_tuples = 0;
2169                 max_offset = PageGetMaxOffsetNumber(page);
2170                 for (newoff = FirstOffsetNumber;
2171                          newoff <= max_offset;
2172                          newoff = OffsetNumberNext(newoff))
2173                 {
2174                         itemid = PageGetItemId(page, newoff);
2175                         if (!ItemIdIsUsed(itemid))
2176                                 continue;
2177                         tuple.t_datamcxt = NULL;
2178                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2179                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2180                         {
2181                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
2182                                         elog(ERROR, "Invalid XID in t_cmin (2)");
2183                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2184                                 {
2185                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2186                                         num_tuples++;
2187                                 }
2188                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2189                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2190                                 else
2191                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2192                         }
2193                 }
2194                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2195                 WriteBuffer(buf);
2196                 Assert((*curpage)->offsets_used == num_tuples);
2197                 checked_moved += num_tuples;
2198         }
2199         Assert(num_moved == checked_moved);
2200
2201         elog(elevel, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
2202                  RelationGetRelationName(onerel),
2203                  nblocks, blkno, num_moved,
2204                  vac_show_rusage(&ru0));
2205
2206         /*
2207          * Reflect the motion of system tuples to catalog cache here.
2208          */
2209         CommandCounterIncrement();
2210
2211         if (Nvacpagelist.num_pages > 0)
2212         {
2213                 /* vacuum indexes again if needed */
2214                 if (Irel != (Relation *) NULL)
2215                 {
2216                         VacPage    *vpleft,
2217                                            *vpright,
2218                                                 vpsave;
2219
2220                         /* re-sort Nvacpagelist.pagedesc */
2221                         for (vpleft = Nvacpagelist.pagedesc,
2222                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2223                                  vpleft < vpright; vpleft++, vpright--)
2224                         {
2225                                 vpsave = *vpleft;
2226                                 *vpleft = *vpright;
2227                                 *vpright = vpsave;
2228                         }
2229                         Assert(keep_tuples >= 0);
2230                         for (i = 0; i < nindexes; i++)
2231                                 vacuum_index(&Nvacpagelist, Irel[i],
2232                                                          vacrelstats->rel_tuples, keep_tuples);
2233                 }
2234
2235                 /* clean moved tuples from last page in Nvacpagelist list */
2236                 if (vacpage->blkno == (blkno - 1) &&
2237                         vacpage->offsets_free > 0)
2238                 {
2239                         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2240                         OffsetNumber *unused = unbuf;
2241                         int                     uncnt;
2242
2243                         buf = ReadBuffer(onerel, vacpage->blkno);
2244                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2245                         page = BufferGetPage(buf);
2246                         num_tuples = 0;
2247                         maxoff = PageGetMaxOffsetNumber(page);
2248                         for (offnum = FirstOffsetNumber;
2249                                  offnum <= maxoff;
2250                                  offnum = OffsetNumberNext(offnum))
2251                         {
2252                                 itemid = PageGetItemId(page, offnum);
2253                                 if (!ItemIdIsUsed(itemid))
2254                                         continue;
2255                                 tuple.t_datamcxt = NULL;
2256                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2257
2258                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2259                                 {
2260                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
2261                                                 elog(ERROR, "Invalid XID in t_cmin (3)");
2262                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2263                                         {
2264                                                 itemid->lp_flags &= ~LP_USED;
2265                                                 num_tuples++;
2266                                         }
2267                                         else
2268                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2269                                 }
2270
2271                         }
2272                         Assert(vacpage->offsets_free == num_tuples);
2273                         START_CRIT_SECTION();
2274                         uncnt = PageRepairFragmentation(page, unused);
2275                         {
2276                                 XLogRecPtr      recptr;
2277
2278                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
2279                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2280                                 PageSetLSN(page, recptr);
2281                                 PageSetSUI(page, ThisStartUpID);
2282                         }
2283                         END_CRIT_SECTION();
2284                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2285                         WriteBuffer(buf);
2286                 }
2287
2288                 /* now - free new list of reaped pages */
2289                 curpage = Nvacpagelist.pagedesc;
2290                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2291                         pfree(*curpage);
2292                 pfree(Nvacpagelist.pagedesc);
2293         }
2294
2295         /*
2296          * Flush dirty pages out to disk.  We do this unconditionally, even if
2297          * we don't need to truncate, because we want to ensure that all
2298          * tuples have correct on-row commit status on disk (see bufmgr.c's
2299          * comments for FlushRelationBuffers()).
2300          */
2301         i = FlushRelationBuffers(onerel, blkno);
2302         if (i < 0)
2303                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2304                          i);
2305
2306         /* truncate relation, if needed */
2307         if (blkno < nblocks)
2308         {
2309                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2310                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2311                 onerel->rd_targblock = InvalidBlockNumber;
2312                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2313         }
2314
2315         /* clean up */
2316         pfree(vacpage);
2317         if (vacrelstats->vtlinks != NULL)
2318                 pfree(vacrelstats->vtlinks);
2319
2320         ExecDropTupleTable(tupleTable, true);
2321
2322         ExecCloseIndices(resultRelInfo);
2323 }
2324
2325 /*
2326  *      vacuum_heap() -- free dead tuples
2327  *
2328  *              This routine marks dead tuples as unused and truncates relation
2329  *              if there are "empty" end-blocks.
2330  */
2331 static void
2332 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2333 {
2334         Buffer          buf;
2335         VacPage    *vacpage;
2336         BlockNumber relblocks;
2337         int                     nblocks;
2338         int                     i;
2339
2340         nblocks = vacuum_pages->num_pages;
2341         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2342
2343         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2344         {
2345                 CHECK_FOR_INTERRUPTS();
2346                 if ((*vacpage)->offsets_free > 0)
2347                 {
2348                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2349                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2350                         vacuum_page(onerel, buf, *vacpage);
2351                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2352                         WriteBuffer(buf);
2353                 }
2354         }
2355
2356         /*
2357          * Flush dirty pages out to disk.  We do this unconditionally, even if
2358          * we don't need to truncate, because we want to ensure that all
2359          * tuples have correct on-row commit status on disk (see bufmgr.c's
2360          * comments for FlushRelationBuffers()).
2361          */
2362         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2363         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2364
2365         i = FlushRelationBuffers(onerel, relblocks);
2366         if (i < 0)
2367                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2368                          i);
2369
2370         /* truncate relation if there are some empty end-pages */
2371         if (vacuum_pages->empty_end_pages > 0)
2372         {
2373                 elog(elevel, "Rel %s: Pages: %u --> %u.",
2374                          RelationGetRelationName(onerel),
2375                          vacrelstats->rel_pages, relblocks);
2376                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2377                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2378                 onerel->rd_targblock = InvalidBlockNumber;
2379                 vacrelstats->rel_pages = relblocks;             /* set new number of
2380                                                                                                  * blocks */
2381         }
2382 }
2383
2384 /*
2385  *      vacuum_page() -- free dead tuples on a page
2386  *                                       and repair its fragmentation.
2387  */
2388 static void
2389 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2390 {
2391         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2392         OffsetNumber *unused = unbuf;
2393         int                     uncnt;
2394         Page            page = BufferGetPage(buffer);
2395         ItemId          itemid;
2396         int                     i;
2397
2398         /* There shouldn't be any tuples moved onto the page yet! */
2399         Assert(vacpage->offsets_used == 0);
2400
2401         START_CRIT_SECTION();
2402         for (i = 0; i < vacpage->offsets_free; i++)
2403         {
2404                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2405                 itemid->lp_flags &= ~LP_USED;
2406         }
2407         uncnt = PageRepairFragmentation(page, unused);
2408         {
2409                 XLogRecPtr      recptr;
2410
2411                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2412                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2413                 PageSetLSN(page, recptr);
2414                 PageSetSUI(page, ThisStartUpID);
2415         }
2416         END_CRIT_SECTION();
2417 }
2418
2419 /*
2420  *      scan_index() -- scan one index relation to update statistic.
2421  *
2422  * We use this when we have no deletions to do.
2423  */
2424 static void
2425 scan_index(Relation indrel, double num_tuples)
2426 {
2427         IndexBulkDeleteResult *stats;
2428         VacRUsage       ru0;
2429
2430         vac_init_rusage(&ru0);
2431
2432         /*
2433          * Even though we're not planning to delete anything, use the
2434          * ambulkdelete call, so that the scan happens within the index AM for
2435          * more speed.
2436          */
2437         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2438
2439         if (!stats)
2440                 return;
2441
2442         /* now update statistics in pg_class */
2443         vac_update_relstats(RelationGetRelid(indrel),
2444                                                 stats->num_pages, stats->num_index_tuples,
2445                                                 false);
2446
2447         elog(elevel, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
2448                  RelationGetRelationName(indrel),
2449                  stats->num_pages, stats->num_index_tuples,
2450                  vac_show_rusage(&ru0));
2451
2452         /*
2453          * Check for tuple count mismatch.      If the index is partial, then it's
2454          * OK for it to have fewer tuples than the heap; else we got trouble.
2455          */
2456         if (stats->num_index_tuples != num_tuples)
2457         {
2458                 if (stats->num_index_tuples > num_tuples ||
2459                         !vac_is_partial_index(indrel))
2460                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2461 \n\tRecreate the index.",
2462                                  RelationGetRelationName(indrel),
2463                                  stats->num_index_tuples, num_tuples);
2464         }
2465
2466         pfree(stats);
2467 }
2468
2469 /*
2470  *      vacuum_index() -- vacuum one index relation.
2471  *
2472  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2473  *              It's locked. Indrel is an index relation on the vacuumed heap.
2474  *
2475  *              We don't bother to set locks on the index relation here, since
2476  *              the parent table is exclusive-locked already.
2477  *
2478  *              Finally, we arrange to update the index relation's statistics in
2479  *              pg_class.
2480  */
2481 static void
2482 vacuum_index(VacPageList vacpagelist, Relation indrel,
2483                          double num_tuples, int keep_tuples)
2484 {
2485         IndexBulkDeleteResult *stats;
2486         VacRUsage       ru0;
2487
2488         vac_init_rusage(&ru0);
2489
2490         /* Do bulk deletion */
2491         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2492
2493         if (!stats)
2494                 return;
2495
2496         /* now update statistics in pg_class */
2497         vac_update_relstats(RelationGetRelid(indrel),
2498                                                 stats->num_pages, stats->num_index_tuples,
2499                                                 false);
2500
2501         elog(elevel, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
2502                  RelationGetRelationName(indrel), stats->num_pages,
2503                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2504                  vac_show_rusage(&ru0));
2505
2506         /*
2507          * Check for tuple count mismatch.      If the index is partial, then it's
2508          * OK for it to have fewer tuples than the heap; else we got trouble.
2509          */
2510         if (stats->num_index_tuples != num_tuples + keep_tuples)
2511         {
2512                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2513                         !vac_is_partial_index(indrel))
2514                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2515 \n\tRecreate the index.",
2516                                  RelationGetRelationName(indrel),
2517                                  stats->num_index_tuples, num_tuples);
2518         }
2519
2520         pfree(stats);
2521 }
2522
2523 /*
2524  *      tid_reaped() -- is a particular tid reaped?
2525  *
2526  *              This has the right signature to be an IndexBulkDeleteCallback.
2527  *
2528  *              vacpagelist->VacPage_array is sorted in right order.
2529  */
2530 static bool
2531 tid_reaped(ItemPointer itemptr, void *state)
2532 {
2533         VacPageList vacpagelist = (VacPageList) state;
2534         OffsetNumber ioffno;
2535         OffsetNumber *voff;
2536         VacPage         vp,
2537                            *vpp;
2538         VacPageData vacpage;
2539
2540         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2541         ioffno = ItemPointerGetOffsetNumber(itemptr);
2542
2543         vp = &vacpage;
2544         vpp = (VacPage *) vac_bsearch((void *) &vp,
2545                                                                   (void *) (vacpagelist->pagedesc),
2546                                                                   vacpagelist->num_pages,
2547                                                                   sizeof(VacPage),
2548                                                                   vac_cmp_blk);
2549
2550         if (vpp == NULL)
2551                 return false;
2552
2553         /* ok - we are on a partially or fully reaped page */
2554         vp = *vpp;
2555
2556         if (vp->offsets_free == 0)
2557         {
2558                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2559                 return true;
2560         }
2561
2562         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2563                                                                                 (void *) (vp->offsets),
2564                                                                                 vp->offsets_free,
2565                                                                                 sizeof(OffsetNumber),
2566                                                                                 vac_cmp_offno);
2567
2568         if (voff == NULL)
2569                 return false;
2570
2571         /* tid is reaped */
2572         return true;
2573 }
2574
2575 /*
2576  * Dummy version for scan_index.
2577  */
2578 static bool
2579 dummy_tid_reaped(ItemPointer itemptr, void *state)
2580 {
2581         return false;
2582 }
2583
2584 /*
2585  * Update the shared Free Space Map with the info we now have about
2586  * free space in the relation, discarding any old info the map may have.
2587  */
2588 static void
2589 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2590                            BlockNumber rel_pages)
2591 {
2592         int                     nPages = fraged_pages->num_pages;
2593         int                     i;
2594         BlockNumber *pages;
2595         Size       *spaceAvail;
2596
2597         /* +1 to avoid palloc(0) */
2598         pages = (BlockNumber *) palloc((nPages + 1) * sizeof(BlockNumber));
2599         spaceAvail = (Size *) palloc((nPages + 1) * sizeof(Size));
2600
2601         for (i = 0; i < nPages; i++)
2602         {
2603                 pages[i] = fraged_pages->pagedesc[i]->blkno;
2604                 spaceAvail[i] = fraged_pages->pagedesc[i]->free;
2605
2606                 /*
2607                  * fraged_pages may contain entries for pages that we later
2608                  * decided to truncate from the relation; don't enter them into
2609                  * the map!
2610                  */
2611                 if (pages[i] >= rel_pages)
2612                 {
2613                         nPages = i;
2614                         break;
2615                 }
2616         }
2617
2618         MultiRecordFreeSpace(&onerel->rd_node,
2619                                                  0, MaxBlockNumber,
2620                                                  nPages, pages, spaceAvail);
2621         pfree(pages);
2622         pfree(spaceAvail);
2623 }
2624
2625 /* Copy a VacPage structure */
2626 static VacPage
2627 copy_vac_page(VacPage vacpage)
2628 {
2629         VacPage         newvacpage;
2630
2631         /* allocate a VacPageData entry */
2632         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2633                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2634
2635         /* fill it in */
2636         if (vacpage->offsets_free > 0)
2637                 memcpy(newvacpage->offsets, vacpage->offsets,
2638                            vacpage->offsets_free * sizeof(OffsetNumber));
2639         newvacpage->blkno = vacpage->blkno;
2640         newvacpage->free = vacpage->free;
2641         newvacpage->offsets_used = vacpage->offsets_used;
2642         newvacpage->offsets_free = vacpage->offsets_free;
2643
2644         return newvacpage;
2645 }
2646
2647 /*
2648  * Add a VacPage pointer to a VacPageList.
2649  *
2650  *              As a side effect of the way that scan_heap works,
2651  *              higher pages come after lower pages in the array
2652  *              (and highest tid on a page is last).
2653  */
2654 static void
2655 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2656 {
2657 #define PG_NPAGEDESC 1024
2658
2659         /* allocate a VacPage entry if needed */
2660         if (vacpagelist->num_pages == 0)
2661         {
2662                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2663                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2664         }
2665         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2666         {
2667                 vacpagelist->num_allocated_pages *= 2;
2668                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2669         }
2670         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2671         (vacpagelist->num_pages)++;
2672 }
2673
2674 /*
2675  * vac_bsearch: just like standard C library routine bsearch(),
2676  * except that we first test to see whether the target key is outside
2677  * the range of the table entries.      This case is handled relatively slowly
2678  * by the normal binary search algorithm (ie, no faster than any other key)
2679  * but it occurs often enough in VACUUM to be worth optimizing.
2680  */
2681 static void *
2682 vac_bsearch(const void *key, const void *base,
2683                         size_t nelem, size_t size,
2684                         int (*compar) (const void *, const void *))
2685 {
2686         int                     res;
2687         const void *last;
2688
2689         if (nelem == 0)
2690                 return NULL;
2691         res = compar(key, base);
2692         if (res < 0)
2693                 return NULL;
2694         if (res == 0)
2695                 return (void *) base;
2696         if (nelem > 1)
2697         {
2698                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2699                 res = compar(key, last);
2700                 if (res > 0)
2701                         return NULL;
2702                 if (res == 0)
2703                         return (void *) last;
2704         }
2705         if (nelem <= 2)
2706                 return NULL;                    /* already checked 'em all */
2707         return bsearch(key, base, nelem, size, compar);
2708 }
2709
2710 /*
2711  * Comparator routines for use with qsort() and bsearch().
2712  */
2713 static int
2714 vac_cmp_blk(const void *left, const void *right)
2715 {
2716         BlockNumber lblk,
2717                                 rblk;
2718
2719         lblk = (*((VacPage *) left))->blkno;
2720         rblk = (*((VacPage *) right))->blkno;
2721
2722         if (lblk < rblk)
2723                 return -1;
2724         if (lblk == rblk)
2725                 return 0;
2726         return 1;
2727 }
2728
2729 static int
2730 vac_cmp_offno(const void *left, const void *right)
2731 {
2732         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2733                 return -1;
2734         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2735                 return 0;
2736         return 1;
2737 }
2738
2739 static int
2740 vac_cmp_vtlinks(const void *left, const void *right)
2741 {
2742         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2743                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2744                 return -1;
2745         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2746                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2747                 return 1;
2748         /* bi_hi-es are equal */
2749         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2750                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2751                 return -1;
2752         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2753                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2754                 return 1;
2755         /* bi_lo-es are equal */
2756         if (((VTupleLink) left)->new_tid.ip_posid <
2757                 ((VTupleLink) right)->new_tid.ip_posid)
2758                 return -1;
2759         if (((VTupleLink) left)->new_tid.ip_posid >
2760                 ((VTupleLink) right)->new_tid.ip_posid)
2761                 return 1;
2762         return 0;
2763 }
2764
2765
2766 void
2767 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2768 {
2769         List       *indexoidlist,
2770                            *indexoidscan;
2771         int                     i;
2772
2773         indexoidlist = RelationGetIndexList(relation);
2774
2775         *nindexes = length(indexoidlist);
2776
2777         if (*nindexes > 0)
2778                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2779         else
2780                 *Irel = NULL;
2781
2782         i = 0;
2783         foreach(indexoidscan, indexoidlist)
2784         {
2785                 Oid                     indexoid = lfirsti(indexoidscan);
2786
2787                 (*Irel)[i] = index_open(indexoid);
2788                 i++;
2789         }
2790
2791         freeList(indexoidlist);
2792 }
2793
2794
2795 void
2796 vac_close_indexes(int nindexes, Relation *Irel)
2797 {
2798         if (Irel == (Relation *) NULL)
2799                 return;
2800
2801         while (nindexes--)
2802                 index_close(Irel[nindexes]);
2803         pfree(Irel);
2804 }
2805
2806
2807 /*
2808  * Is an index partial (ie, could it contain fewer tuples than the heap?)
2809  */
2810 bool
2811 vac_is_partial_index(Relation indrel)
2812 {
2813         /*
2814          * If the index's AM doesn't support nulls, it's partial for our
2815          * purposes
2816          */
2817         if (!indrel->rd_am->amindexnulls)
2818                 return true;
2819
2820         /* Otherwise, look to see if there's a partial-index predicate */
2821         return (VARSIZE(&indrel->rd_index->indpred) > VARHDRSZ);
2822 }
2823
2824
2825 static bool
2826 enough_space(VacPage vacpage, Size len)
2827 {
2828         len = MAXALIGN(len);
2829
2830         if (len > vacpage->free)
2831                 return false;
2832
2833         /* if there are free itemid(s) and len <= free_space... */
2834         if (vacpage->offsets_used < vacpage->offsets_free)
2835                 return true;
2836
2837         /* noff_used >= noff_free and so we'll have to allocate new itemid */
2838         if (len + sizeof(ItemIdData) <= vacpage->free)
2839                 return true;
2840
2841         return false;
2842 }
2843
2844
2845 /*
2846  * Initialize usage snapshot.
2847  */
2848 void
2849 vac_init_rusage(VacRUsage *ru0)
2850 {
2851         struct timezone tz;
2852
2853         getrusage(RUSAGE_SELF, &ru0->ru);
2854         gettimeofday(&ru0->tv, &tz);
2855 }
2856
2857 /*
2858  * Compute elapsed time since ru0 usage snapshot, and format into
2859  * a displayable string.  Result is in a static string, which is
2860  * tacky, but no one ever claimed that the Postgres backend is
2861  * threadable...
2862  */
2863 const char *
2864 vac_show_rusage(VacRUsage *ru0)
2865 {
2866         static char result[100];
2867         VacRUsage       ru1;
2868
2869         vac_init_rusage(&ru1);
2870
2871         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
2872         {
2873                 ru1.tv.tv_sec--;
2874                 ru1.tv.tv_usec += 1000000;
2875         }
2876         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
2877         {
2878                 ru1.ru.ru_stime.tv_sec--;
2879                 ru1.ru.ru_stime.tv_usec += 1000000;
2880         }
2881         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
2882         {
2883                 ru1.ru.ru_utime.tv_sec--;
2884                 ru1.ru.ru_utime.tv_usec += 1000000;
2885         }
2886
2887         snprintf(result, sizeof(result),
2888                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
2889                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
2890           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
2891                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
2892           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
2893                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
2894                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
2895
2896         return result;
2897 }