]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Checking to decide whether relations are system relations now depends
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.223 2002/04/12 20:38:25 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_index.h"
33 #include "commands/vacuum.h"
34 #include "executor/executor.h"
35 #include "miscadmin.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "storage/smgr.h"
39 #include "tcop/pquery.h"
40 #include "utils/acl.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/inval.h"
44 #include "utils/lsyscache.h"
45 #include "utils/relcache.h"
46 #include "utils/syscache.h"
47 #include "pgstat.h"
48
49
50 typedef struct VacPageData
51 {
52         BlockNumber blkno;                      /* BlockNumber of this Page */
53         Size            free;                   /* FreeSpace on this Page */
54         uint16          offsets_used;   /* Number of OffNums used by vacuum */
55         uint16          offsets_free;   /* Number of OffNums free or to be free */
56         OffsetNumber offsets[1];        /* Array of free OffNums */
57 } VacPageData;
58
59 typedef VacPageData *VacPage;
60
61 typedef struct VacPageListData
62 {
63         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
64         int                     num_pages;              /* Number of pages in pagedesc */
65         int                     num_allocated_pages;    /* Number of allocated pages in
66                                                                                  * pagedesc */
67         VacPage    *pagedesc;           /* Descriptions of pages */
68 } VacPageListData;
69
70 typedef VacPageListData *VacPageList;
71
72 typedef struct VTupleLinkData
73 {
74         ItemPointerData new_tid;
75         ItemPointerData this_tid;
76 } VTupleLinkData;
77
78 typedef VTupleLinkData *VTupleLink;
79
80 typedef struct VTupleMoveData
81 {
82         ItemPointerData tid;            /* tuple ID */
83         VacPage         vacpage;                /* where to move */
84         bool            cleanVpd;               /* clean vacpage before using */
85 } VTupleMoveData;
86
87 typedef VTupleMoveData *VTupleMove;
88
89 typedef struct VRelStats
90 {
91         BlockNumber rel_pages;
92         double          rel_tuples;
93         Size            min_tlen;
94         Size            max_tlen;
95         bool            hasindex;
96         int                     num_vtlinks;
97         VTupleLink      vtlinks;
98 } VRelStats;
99
100
101 static MemoryContext vac_context = NULL;
102
103 static int elevel = -1;
104
105 static TransactionId OldestXmin;
106 static TransactionId FreezeLimit;
107
108 static TransactionId initialOldestXmin;
109 static TransactionId initialFreezeLimit;
110
111
112 /* non-export function prototypes */
113 static void vacuum_init(VacuumStmt *vacstmt);
114 static void vacuum_shutdown(VacuumStmt *vacstmt);
115 static List *getrels(const RangeVar *vacrel, const char *stmttype);
116 static void vac_update_dbstats(Oid dbid,
117                                    TransactionId vacuumXID,
118                                    TransactionId frozenXID);
119 static void vac_truncate_clog(TransactionId vacuumXID,
120                                   TransactionId frozenXID);
121 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
122 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
123 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
124                   VacPageList vacuum_pages, VacPageList fraged_pages);
125 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
126                         VacPageList vacuum_pages, VacPageList fraged_pages,
127                         int nindexes, Relation *Irel);
128 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
129                         VacPageList vacpagelist);
130 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
131 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
132                          double num_tuples, int keep_tuples);
133 static void scan_index(Relation indrel, double num_tuples);
134 static bool tid_reaped(ItemPointer itemptr, void *state);
135 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
136 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
137                            BlockNumber rel_pages);
138 static VacPage copy_vac_page(VacPage vacpage);
139 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
140 static void *vac_bsearch(const void *key, const void *base,
141                         size_t nelem, size_t size,
142                         int (*compar) (const void *, const void *));
143 static int      vac_cmp_blk(const void *left, const void *right);
144 static int      vac_cmp_offno(const void *left, const void *right);
145 static int      vac_cmp_vtlinks(const void *left, const void *right);
146 static bool enough_space(VacPage vacpage, Size len);
147
148
149 /****************************************************************************
150  *                                                                                                                                                      *
151  *                      Code common to all flavors of VACUUM and ANALYZE                                *
152  *                                                                                                                                                      *
153  ****************************************************************************
154  */
155
156
157 /*
158  * Primary entry point for VACUUM and ANALYZE commands.
159  */
160 void
161 vacuum(VacuumStmt *vacstmt)
162 {
163         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
164         List       *vrl,
165                            *cur;
166
167         if (vacstmt->verbose)
168                 elevel = INFO;
169         else
170                 elevel = DEBUG1;
171
172         /*
173          * We cannot run VACUUM inside a user transaction block; if we were
174          * inside a transaction, then our commit- and
175          * start-transaction-command calls would not have the intended effect!
176          * Furthermore, the forced commit that occurs before truncating the
177          * relation's file would have the effect of committing the rest of the
178          * user's transaction too, which would certainly not be the desired
179          * behavior.
180          */
181         if (IsTransactionBlock())
182                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
183
184         /*
185          * Send info about dead objects to the statistics collector
186          */
187         pgstat_vacuum_tabstat();
188
189         /*
190          * Create special memory context for cross-transaction storage.
191          *
192          * Since it is a child of QueryContext, it will go away eventually even
193          * if we suffer an error; there's no need for special abort cleanup
194          * logic.
195          */
196         vac_context = AllocSetContextCreate(QueryContext,
197                                                                                 "Vacuum",
198                                                                                 ALLOCSET_DEFAULT_MINSIZE,
199                                                                                 ALLOCSET_DEFAULT_INITSIZE,
200                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
201
202         /* Build list of relations to process (note this lives in vac_context) */
203         vrl = getrels(vacstmt->relation, stmttype);
204
205         /*
206          * Start up the vacuum cleaner.
207          */
208         vacuum_init(vacstmt);
209
210         /*
211          * Process each selected relation.      We are careful to process each
212          * relation in a separate transaction in order to avoid holding too
213          * many locks at one time.      Also, if we are doing VACUUM ANALYZE, the
214          * ANALYZE part runs as a separate transaction from the VACUUM to
215          * further reduce locking.
216          */
217         foreach(cur, vrl)
218         {
219                 Oid             relid = (Oid) lfirsti(cur);
220
221                 if (vacstmt->vacuum)
222                         vacuum_rel(relid, vacstmt, RELKIND_RELATION);
223                 if (vacstmt->analyze)
224                         analyze_rel(relid, vacstmt);
225         }
226
227         /* clean up */
228         vacuum_shutdown(vacstmt);
229 }
230
231 /*
232  *      vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
233  *
234  *              Formerly, there was code here to prevent more than one VACUUM from
235  *              executing concurrently in the same database.  However, there's no
236  *              good reason to prevent that, and manually removing lockfiles after
237  *              a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
238  *              and just rely on the locks we grab on each target table
239  *              to ensure that there aren't two VACUUMs running on the same table
240  *              at the same time.
241  *
242  *              The strangeness with committing and starting transactions in the
243  *              init and shutdown routines is due to the fact that the vacuum cleaner
244  *              is invoked via an SQL command, and so is already executing inside
245  *              a transaction.  We need to leave ourselves in a predictable state
246  *              on entry and exit to the vacuum cleaner.  We commit the transaction
247  *              started in PostgresMain() inside vacuum_init(), and start one in
248  *              vacuum_shutdown() to match the commit waiting for us back in
249  *              PostgresMain().
250  */
251 static void
252 vacuum_init(VacuumStmt *vacstmt)
253 {
254         if (vacstmt->vacuum && vacstmt->relation == NULL)
255         {
256                 /*
257                  * Compute the initially applicable OldestXmin and FreezeLimit
258                  * XIDs, so that we can record these values at the end of the
259                  * VACUUM. Note that individual tables may well be processed with
260                  * newer values, but we can guarantee that no (non-shared)
261                  * relations are processed with older ones.
262                  *
263                  * It is okay to record non-shared values in pg_database, even though
264                  * we may vacuum shared relations with older cutoffs, because only
265                  * the minimum of the values present in pg_database matters.  We
266                  * can be sure that shared relations have at some time been
267                  * vacuumed with cutoffs no worse than the global minimum; for, if
268                  * there is a backend in some other DB with xmin = OLDXMIN that's
269                  * determining the cutoff with which we vacuum shared relations,
270                  * it is not possible for that database to have a cutoff newer
271                  * than OLDXMIN recorded in pg_database.
272                  */
273                 vacuum_set_xid_limits(vacstmt, false,
274                                                           &initialOldestXmin, &initialFreezeLimit);
275         }
276
277         /* matches the StartTransaction in PostgresMain() */
278         CommitTransactionCommand();
279 }
280
281 static void
282 vacuum_shutdown(VacuumStmt *vacstmt)
283 {
284         /* on entry, we are not in a transaction */
285
286         /* matches the CommitTransaction in PostgresMain() */
287         StartTransactionCommand();
288
289         /*
290          * If we did a database-wide VACUUM, update the database's pg_database
291          * row with info about the transaction IDs used, and try to truncate
292          * pg_clog.
293          */
294         if (vacstmt->vacuum && vacstmt->relation == NULL)
295         {
296                 vac_update_dbstats(MyDatabaseId,
297                                                    initialOldestXmin, initialFreezeLimit);
298                 vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
299         }
300
301         /*
302          * Clean up working storage --- note we must do this after
303          * StartTransactionCommand, else we might be trying to delete the
304          * active context!
305          */
306         MemoryContextDelete(vac_context);
307         vac_context = NULL;
308 }
309
310 /*
311  * Build a list of Oids for each relation to be processed
312  *
313  * The list is built in vac_context so that it will survive across our
314  * per-relation transactions.
315  */
316 static List *
317 getrels(const RangeVar *vacrel, const char *stmttype)
318 {
319         List       *vrl = NIL;
320         MemoryContext oldcontext;
321
322         if (vacrel)
323         {
324                 /* Process specific relation */
325                 Oid             relid;
326
327                 relid = RangeVarGetRelid(vacrel, false);
328
329                 /* Make a relation list entry for this guy */
330                 oldcontext = MemoryContextSwitchTo(vac_context);
331                 vrl = lappendi(vrl, relid);
332                 MemoryContextSwitchTo(oldcontext);
333         }
334         else
335         {
336                 /* Process all plain relations listed in pg_class */
337                 Relation        pgclass;
338                 HeapScanDesc scan;
339                 HeapTuple       tuple;
340                 ScanKeyData key;
341
342                 ScanKeyEntryInitialize(&key, 0x0,
343                                                            Anum_pg_class_relkind,
344                                                            F_CHAREQ,
345                                                            CharGetDatum(RELKIND_RELATION));
346
347                 pgclass = heap_openr(RelationRelationName, AccessShareLock);
348
349                 scan = heap_beginscan(pgclass, false, SnapshotNow, 1, &key);
350
351                 while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
352                 {
353                         /* Make a relation list entry for this guy */
354                         oldcontext = MemoryContextSwitchTo(vac_context);
355                         vrl = lappendi(vrl, tuple->t_data->t_oid);
356                         MemoryContextSwitchTo(oldcontext);
357                 }
358
359                 heap_endscan(scan);
360                 heap_close(pgclass, AccessShareLock);
361         }
362
363         return vrl;
364 }
365
366 /*
367  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
368  */
369 void
370 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
371                                           TransactionId *oldestXmin,
372                                           TransactionId *freezeLimit)
373 {
374         TransactionId limit;
375
376         *oldestXmin = GetOldestXmin(sharedRel);
377
378         Assert(TransactionIdIsNormal(*oldestXmin));
379
380         if (vacstmt->freeze)
381         {
382                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
383                 limit = *oldestXmin;
384         }
385         else
386         {
387                 /*
388                  * Normal case: freeze cutoff is well in the past, to wit, about
389                  * halfway to the wrap horizon
390                  */
391                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
392         }
393
394         /*
395          * Be careful not to generate a "permanent" XID
396          */
397         if (!TransactionIdIsNormal(limit))
398                 limit = FirstNormalTransactionId;
399
400         /*
401          * Ensure sane relationship of limits
402          */
403         if (TransactionIdFollows(limit, *oldestXmin))
404         {
405                 elog(WARNING, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
406                 limit = *oldestXmin;
407         }
408
409         *freezeLimit = limit;
410 }
411
412
413 /*
414  *      vac_update_relstats() -- update statistics for one relation
415  *
416  *              Update the whole-relation statistics that are kept in its pg_class
417  *              row.  There are additional stats that will be updated if we are
418  *              doing ANALYZE, but we always update these stats.  This routine works
419  *              for both index and heap relation entries in pg_class.
420  *
421  *              We violate no-overwrite semantics here by storing new values for the
422  *              statistics columns directly into the pg_class tuple that's already on
423  *              the page.  The reason for this is that if we updated these tuples in
424  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
425  *              by the time we got done with a vacuum cycle, most of the tuples in
426  *              pg_class would've been obsoleted.  Of course, this only works for
427  *              fixed-size never-null columns, but these are.
428  *
429  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
430  *              ANALYZE.
431  */
432 void
433 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
434                                         bool hasindex)
435 {
436         Relation        rd;
437         HeapTupleData rtup;
438         HeapTuple       ctup;
439         Form_pg_class pgcform;
440         Buffer          buffer;
441
442         /*
443          * update number of tuples and number of pages in pg_class
444          */
445         rd = heap_openr(RelationRelationName, RowExclusiveLock);
446
447         ctup = SearchSysCache(RELOID,
448                                                   ObjectIdGetDatum(relid),
449                                                   0, 0, 0);
450         if (!HeapTupleIsValid(ctup))
451                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
452                          relid);
453
454         /* get the buffer cache tuple */
455         rtup.t_self = ctup->t_self;
456         ReleaseSysCache(ctup);
457         heap_fetch(rd, SnapshotNow, &rtup, &buffer, NULL);
458
459         /* overwrite the existing statistics in the tuple */
460         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
461         pgcform->relpages = (int32) num_pages;
462         pgcform->reltuples = num_tuples;
463         pgcform->relhasindex = hasindex;
464
465         /*
466          * If we have discovered that there are no indexes, then there's no
467          * primary key either.  This could be done more thoroughly...
468          */
469         if (!hasindex)
470                 pgcform->relhaspkey = false;
471
472         /*
473          * Invalidate the tuple in the catcaches; this also arranges to flush
474          * the relation's relcache entry.  (If we fail to commit for some reason,
475          * no flush will occur, but no great harm is done since there are no
476          * noncritical state updates here.)
477          */
478         CacheInvalidateHeapTuple(rd, &rtup);
479
480         /* Write the buffer */
481         WriteBuffer(buffer);
482
483         heap_close(rd, RowExclusiveLock);
484 }
485
486
487 /*
488  *      vac_update_dbstats() -- update statistics for one database
489  *
490  *              Update the whole-database statistics that are kept in its pg_database
491  *              row.
492  *
493  *              We violate no-overwrite semantics here by storing new values for the
494  *              statistics columns directly into the tuple that's already on the page.
495  *              As with vac_update_relstats, this avoids leaving dead tuples behind
496  *              after a VACUUM; which is good since GetRawDatabaseInfo
497  *              can get confused by finding dead tuples in pg_database.
498  *
499  *              This routine is shared by full and lazy VACUUM.  Note that it is only
500  *              applied after a database-wide VACUUM operation.
501  */
502 static void
503 vac_update_dbstats(Oid dbid,
504                                    TransactionId vacuumXID,
505                                    TransactionId frozenXID)
506 {
507         Relation        relation;
508         ScanKeyData entry[1];
509         HeapScanDesc scan;
510         HeapTuple       tuple;
511         Form_pg_database dbform;
512
513         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
514
515         /* Must use a heap scan, since there's no syscache for pg_database */
516         ScanKeyEntryInitialize(&entry[0], 0x0,
517                                                    ObjectIdAttributeNumber, F_OIDEQ,
518                                                    ObjectIdGetDatum(dbid));
519
520         scan = heap_beginscan(relation, 0, SnapshotNow, 1, entry);
521
522         tuple = heap_getnext(scan, 0);
523
524         if (!HeapTupleIsValid(tuple))
525                 elog(ERROR, "database %u does not exist", dbid);
526
527         dbform = (Form_pg_database) GETSTRUCT(tuple);
528
529         /* overwrite the existing statistics in the tuple */
530         dbform->datvacuumxid = vacuumXID;
531         dbform->datfrozenxid = frozenXID;
532
533         /* invalidate the tuple in the cache and write the buffer */
534         CacheInvalidateHeapTuple(relation, tuple);
535         WriteNoReleaseBuffer(scan->rs_cbuf);
536
537         heap_endscan(scan);
538
539         heap_close(relation, RowExclusiveLock);
540 }
541
542
543 /*
544  *      vac_truncate_clog() -- attempt to truncate the commit log
545  *
546  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
547  *              and use it to truncate the transaction commit log (pg_clog).
548  *              Also generate a warning if the system-wide oldest datfrozenxid
549  *              seems to be in danger of wrapping around.
550  *
551  *              The passed XIDs are simply the ones I just wrote into my pg_database
552  *              entry.  They're used to initialize the "min" calculations.
553  *
554  *              This routine is shared by full and lazy VACUUM.  Note that it is only
555  *              applied after a database-wide VACUUM operation.
556  */
557 static void
558 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
559 {
560         TransactionId myXID;
561         Relation        relation;
562         HeapScanDesc scan;
563         HeapTuple       tuple;
564         int32           age;
565         bool            vacuumAlreadyWrapped = false;
566         bool            frozenAlreadyWrapped = false;
567
568         myXID = GetCurrentTransactionId();
569
570         relation = heap_openr(DatabaseRelationName, AccessShareLock);
571
572         scan = heap_beginscan(relation, 0, SnapshotNow, 0, NULL);
573
574         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
575         {
576                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
577
578                 /* Ignore non-connectable databases (eg, template0) */
579                 /* It's assumed that these have been frozen correctly */
580                 if (!dbform->datallowconn)
581                         continue;
582
583                 if (TransactionIdIsNormal(dbform->datvacuumxid))
584                 {
585                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
586                                 vacuumAlreadyWrapped = true;
587                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
588                                 vacuumXID = dbform->datvacuumxid;
589                 }
590                 if (TransactionIdIsNormal(dbform->datfrozenxid))
591                 {
592                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
593                                 frozenAlreadyWrapped = true;
594                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
595                                 frozenXID = dbform->datfrozenxid;
596                 }
597         }
598
599         heap_endscan(scan);
600
601         heap_close(relation, AccessShareLock);
602
603         /*
604          * Do not truncate CLOG if we seem to have suffered wraparound already;
605          * the computed minimum XID might be bogus.
606          */
607         if (vacuumAlreadyWrapped)
608         {
609                 elog(WARNING, "Some databases have not been vacuumed in over 2 billion transactions."
610                          "\n\tYou may have already suffered transaction-wraparound data loss.");
611                 return;
612         }
613
614         /* Truncate CLOG to the oldest vacuumxid */
615         TruncateCLOG(vacuumXID);
616
617         /* Give warning about impending wraparound problems */
618         if (frozenAlreadyWrapped)
619         {
620                 elog(WARNING, "Some databases have not been vacuumed in over 1 billion transactions."
621                          "\n\tBetter vacuum them soon, or you may have a wraparound failure.");
622         }
623         else
624         {
625                 age = (int32) (myXID - frozenXID);
626                 if (age > (int32) ((MaxTransactionId >> 3) * 3))
627                         elog(WARNING, "Some databases have not been vacuumed in %d transactions."
628                                  "\n\tBetter vacuum them within %d transactions,"
629                                  "\n\tor you may have a wraparound failure.",
630                                  age, (int32) (MaxTransactionId >> 1) - age);
631         }
632 }
633
634
635 /****************************************************************************
636  *                                                                                                                                                      *
637  *                      Code common to both flavors of VACUUM                                                   *
638  *                                                                                                                                                      *
639  ****************************************************************************
640  */
641
642
643 /*
644  *      vacuum_rel() -- vacuum one heap relation
645  *
646  *              Doing one heap at a time incurs extra overhead, since we need to
647  *              check that the heap exists again just before we vacuum it.      The
648  *              reason that we do this is so that vacuuming can be spread across
649  *              many small transactions.  Otherwise, two-phase locking would require
650  *              us to lock the entire database during one pass of the vacuum cleaner.
651  *
652  *              At entry and exit, we are not inside a transaction.
653  */
654 static void
655 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
656 {
657         LOCKMODE        lmode;
658         Relation        onerel;
659         LockRelId       onerelid;
660         Oid                     toast_relid;
661
662         /* Begin a transaction for vacuuming this relation */
663         StartTransactionCommand();
664
665         /*
666          * Check for user-requested abort.      Note we want this to be inside a
667          * transaction, so xact.c doesn't issue useless WARNING.
668          */
669         CHECK_FOR_INTERRUPTS();
670
671         /*
672          * Race condition -- if the pg_class tuple has gone away since the
673          * last time we saw it, we don't need to vacuum it.
674          */
675         if (!SearchSysCacheExists(RELOID,
676                                                           ObjectIdGetDatum(relid),
677                                                           0, 0, 0))
678         {
679                 CommitTransactionCommand();
680                 return;
681         }
682
683         /*
684          * Determine the type of lock we want --- hard exclusive lock for a
685          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
686          * vacuum.      Either way, we can be sure that no other backend is
687          * vacuuming the same table.
688          */
689         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
690
691         /*
692          * Open the class, get an appropriate lock on it, and check
693          * permissions.
694          *
695          * We allow the user to vacuum a table if he is superuser, the table
696          * owner, or the database owner (but in the latter case, only if it's
697          * not a shared relation).      pg_class_ownercheck includes the superuser case.
698          *
699          * Note we choose to treat permissions failure as a WARNING and keep
700          * trying to vacuum the rest of the DB --- is this appropriate?
701          */
702         onerel = relation_open(relid, lmode);
703
704         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
705                   (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
706         {
707                 elog(WARNING, "Skipping \"%s\" --- only table or database owner can VACUUM it",
708                          RelationGetRelationName(onerel));
709                 relation_close(onerel, lmode);
710                 CommitTransactionCommand();
711                 return;
712         }
713
714         /*
715          * Check that it's a plain table; we used to do this in getrels() but
716          * seems safer to check after we've locked the relation.
717          */
718         if (onerel->rd_rel->relkind != expected_relkind)
719         {
720                 elog(WARNING, "Skipping \"%s\" --- can not process indexes, views or special system tables",
721                          RelationGetRelationName(onerel));
722                 relation_close(onerel, lmode);
723                 CommitTransactionCommand();
724                 return;
725         }
726
727         /*
728          * Get a session-level lock too. This will protect our access to the
729          * relation across multiple transactions, so that we can vacuum the
730          * relation's TOAST table (if any) secure in the knowledge that no one
731          * is deleting the parent relation.
732          *
733          * NOTE: this cannot block, even if someone else is waiting for access,
734          * because the lock manager knows that both lock requests are from the
735          * same process.
736          */
737         onerelid = onerel->rd_lockInfo.lockRelId;
738         LockRelationForSession(&onerelid, lmode);
739
740         /*
741          * Remember the relation's TOAST relation for later
742          */
743         toast_relid = onerel->rd_rel->reltoastrelid;
744
745         /*
746          * Do the actual work --- either FULL or "lazy" vacuum
747          */
748         if (vacstmt->full)
749                 full_vacuum_rel(onerel, vacstmt);
750         else
751                 lazy_vacuum_rel(onerel, vacstmt);
752
753         /* all done with this class, but hold lock until commit */
754         relation_close(onerel, NoLock);
755
756         /*
757          * Complete the transaction and free all temporary memory used.
758          */
759         CommitTransactionCommand();
760
761         /*
762          * If the relation has a secondary toast rel, vacuum that too while we
763          * still hold the session lock on the master table.  Note however that
764          * "analyze" will not get done on the toast table.      This is good,
765          * because the toaster always uses hardcoded index access and
766          * statistics are totally unimportant for toast relations.
767          */
768         if (toast_relid != InvalidOid)
769                 vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE);
770
771         /*
772          * Now release the session-level lock on the master table.
773          */
774         UnlockRelationForSession(&onerelid, lmode);
775 }
776
777
778 /****************************************************************************
779  *                                                                                                                                                      *
780  *                      Code for VACUUM FULL (only)                                                                             *
781  *                                                                                                                                                      *
782  ****************************************************************************
783  */
784
785
786 /*
787  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
788  *
789  *              This routine vacuums a single heap, cleans out its indexes, and
790  *              updates its num_pages and num_tuples statistics.
791  *
792  *              At entry, we have already established a transaction and opened
793  *              and locked the relation.
794  */
795 static void
796 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
797 {
798         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
799                                                                                  * clean indexes */
800         VacPageListData fraged_pages;           /* List of pages with space enough
801                                                                                  * for re-using */
802         Relation   *Irel;
803         int                     nindexes,
804                                 i;
805         VRelStats  *vacrelstats;
806         bool            reindex = false;
807
808         if (IsIgnoringSystemIndexes() &&
809                 IsSystemRelation(onerel))
810                 reindex = true;
811
812         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
813                                                   &OldestXmin, &FreezeLimit);
814
815         /*
816          * Set up statistics-gathering machinery.
817          */
818         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
819         vacrelstats->rel_pages = 0;
820         vacrelstats->rel_tuples = 0;
821         vacrelstats->hasindex = false;
822
823         /* scan the heap */
824         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
825         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
826
827         /* Now open all indexes of the relation */
828         vac_open_indexes(onerel, &nindexes, &Irel);
829         if (!Irel)
830                 reindex = false;
831         else if (!RelationGetForm(onerel)->relhasindex)
832                 reindex = true;
833         if (nindexes > 0)
834                 vacrelstats->hasindex = true;
835
836 #ifdef NOT_USED
837
838         /*
839          * reindex in VACUUM is dangerous under WAL. ifdef out until it
840          * becomes safe.
841          */
842         if (reindex)
843         {
844                 vac_close_indexes(nindexes, Irel);
845                 Irel = (Relation *) NULL;
846                 activate_indexes_of_a_table(RelationGetRelid(onerel), false);
847         }
848 #endif   /* NOT_USED */
849
850         /* Clean/scan index relation(s) */
851         if (Irel != (Relation *) NULL)
852         {
853                 if (vacuum_pages.num_pages > 0)
854                 {
855                         for (i = 0; i < nindexes; i++)
856                                 vacuum_index(&vacuum_pages, Irel[i],
857                                                          vacrelstats->rel_tuples, 0);
858                 }
859                 else
860                 {
861                         /* just scan indexes to update statistic */
862                         for (i = 0; i < nindexes; i++)
863                                 scan_index(Irel[i], vacrelstats->rel_tuples);
864                 }
865         }
866
867         if (fraged_pages.num_pages > 0)
868         {
869                 /* Try to shrink heap */
870                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
871                                         nindexes, Irel);
872                 vac_close_indexes(nindexes, Irel);
873         }
874         else
875         {
876                 vac_close_indexes(nindexes, Irel);
877                 if (vacuum_pages.num_pages > 0)
878                 {
879                         /* Clean pages from vacuum_pages list */
880                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
881                 }
882                 else
883                 {
884                         /*
885                          * Flush dirty pages out to disk.  We must do this even if we
886                          * didn't do anything else, because we want to ensure that all
887                          * tuples have correct on-row commit status on disk (see
888                          * bufmgr.c's comments for FlushRelationBuffers()).
889                          */
890                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
891                         if (i < 0)
892                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
893                                          i);
894                 }
895         }
896
897 #ifdef NOT_USED
898         if (reindex)
899                 activate_indexes_of_a_table(RelationGetRelid(onerel), true);
900 #endif   /* NOT_USED */
901
902         /* update shared free space map with final free space info */
903         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
904
905         /* update statistics in pg_class */
906         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
907                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
908 }
909
910
911 /*
912  *      scan_heap() -- scan an open heap relation
913  *
914  *              This routine sets commit status bits, constructs vacuum_pages (list
915  *              of pages we need to compact free space on and/or clean indexes of
916  *              deleted tuples), constructs fraged_pages (list of pages with free
917  *              space that tuples could be moved into), and calculates statistics
918  *              on the number of live tuples in the heap.
919  */
920 static void
921 scan_heap(VRelStats *vacrelstats, Relation onerel,
922                   VacPageList vacuum_pages, VacPageList fraged_pages)
923 {
924         BlockNumber nblocks,
925                                 blkno;
926         ItemId          itemid;
927         Buffer          buf;
928         HeapTupleData tuple;
929         OffsetNumber offnum,
930                                 maxoff;
931         bool            pgchanged,
932                                 tupgone,
933                                 notup;
934         char       *relname;
935         VacPage         vacpage,
936                                 vacpagecopy;
937         BlockNumber empty_pages,
938                                 new_pages,
939                                 changed_pages,
940                                 empty_end_pages;
941         double          num_tuples,
942                                 tups_vacuumed,
943                                 nkeep,
944                                 nunused;
945         double          free_size,
946                                 usable_free_size;
947         Size            min_tlen = MaxTupleSize;
948         Size            max_tlen = 0;
949         int                     i;
950         bool            do_shrinking = true;
951         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
952         int                     num_vtlinks = 0;
953         int                     free_vtlinks = 100;
954         VacRUsage       ru0;
955
956         vac_init_rusage(&ru0);
957
958         relname = RelationGetRelationName(onerel);
959         elog(elevel, "--Relation %s.%s--",
960                  get_namespace_name(RelationGetNamespace(onerel)),
961                  relname);
962
963         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
964         num_tuples = tups_vacuumed = nkeep = nunused = 0;
965         free_size = 0;
966
967         nblocks = RelationGetNumberOfBlocks(onerel);
968
969         /*
970          * We initially create each VacPage item in a maximal-sized workspace,
971          * then copy the workspace into a just-large-enough copy.
972          */
973         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
974
975         for (blkno = 0; blkno < nblocks; blkno++)
976         {
977                 Page            page,
978                                         tempPage = NULL;
979                 bool            do_reap,
980                                         do_frag;
981
982                 CHECK_FOR_INTERRUPTS();
983
984                 buf = ReadBuffer(onerel, blkno);
985                 page = BufferGetPage(buf);
986
987                 vacpage->blkno = blkno;
988                 vacpage->offsets_used = 0;
989                 vacpage->offsets_free = 0;
990
991                 if (PageIsNew(page))
992                 {
993                         elog(WARNING, "Rel %s: Uninitialized page %u - fixing",
994                                  relname, blkno);
995                         PageInit(page, BufferGetPageSize(buf), 0);
996                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
997                         free_size += (vacpage->free - sizeof(ItemIdData));
998                         new_pages++;
999                         empty_end_pages++;
1000                         vacpagecopy = copy_vac_page(vacpage);
1001                         vpage_insert(vacuum_pages, vacpagecopy);
1002                         vpage_insert(fraged_pages, vacpagecopy);
1003                         WriteBuffer(buf);
1004                         continue;
1005                 }
1006
1007                 if (PageIsEmpty(page))
1008                 {
1009                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1010                         free_size += (vacpage->free - sizeof(ItemIdData));
1011                         empty_pages++;
1012                         empty_end_pages++;
1013                         vacpagecopy = copy_vac_page(vacpage);
1014                         vpage_insert(vacuum_pages, vacpagecopy);
1015                         vpage_insert(fraged_pages, vacpagecopy);
1016                         ReleaseBuffer(buf);
1017                         continue;
1018                 }
1019
1020                 pgchanged = false;
1021                 notup = true;
1022                 maxoff = PageGetMaxOffsetNumber(page);
1023                 for (offnum = FirstOffsetNumber;
1024                          offnum <= maxoff;
1025                          offnum = OffsetNumberNext(offnum))
1026                 {
1027                         uint16          sv_infomask;
1028
1029                         itemid = PageGetItemId(page, offnum);
1030
1031                         /*
1032                          * Collect un-used items too - it's possible to have indexes
1033                          * pointing here after crash.
1034                          */
1035                         if (!ItemIdIsUsed(itemid))
1036                         {
1037                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1038                                 nunused += 1;
1039                                 continue;
1040                         }
1041
1042                         tuple.t_datamcxt = NULL;
1043                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1044                         tuple.t_len = ItemIdGetLength(itemid);
1045                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1046
1047                         tupgone = false;
1048                         sv_infomask = tuple.t_data->t_infomask;
1049
1050                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1051                         {
1052                                 case HEAPTUPLE_DEAD:
1053                                         tupgone = true;         /* we can delete the tuple */
1054                                         break;
1055                                 case HEAPTUPLE_LIVE:
1056
1057                                         /*
1058                                          * Tuple is good.  Consider whether to replace its
1059                                          * xmin value with FrozenTransactionId.
1060                                          */
1061                                         if (TransactionIdIsNormal(tuple.t_data->t_xmin) &&
1062                                                 TransactionIdPrecedes(tuple.t_data->t_xmin,
1063                                                                                           FreezeLimit))
1064                                         {
1065                                                 tuple.t_data->t_xmin = FrozenTransactionId;
1066                                                 /* infomask should be okay already */
1067                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1068                                                 pgchanged = true;
1069                                         }
1070                                         break;
1071                                 case HEAPTUPLE_RECENTLY_DEAD:
1072
1073                                         /*
1074                                          * If tuple is recently deleted then we must not
1075                                          * remove it from relation.
1076                                          */
1077                                         nkeep += 1;
1078
1079                                         /*
1080                                          * If we do shrinking and this tuple is updated one
1081                                          * then remember it to construct updated tuple
1082                                          * dependencies.
1083                                          */
1084                                         if (do_shrinking &&
1085                                                 !(ItemPointerEquals(&(tuple.t_self),
1086                                                                                         &(tuple.t_data->t_ctid))))
1087                                         {
1088                                                 if (free_vtlinks == 0)
1089                                                 {
1090                                                         free_vtlinks = 1000;
1091                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1092                                                                                    (free_vtlinks + num_vtlinks) *
1093                                                                                                  sizeof(VTupleLinkData));
1094                                                 }
1095                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1096                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1097                                                 free_vtlinks--;
1098                                                 num_vtlinks++;
1099                                         }
1100                                         break;
1101                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1102
1103                                         /*
1104                                          * This should not happen, since we hold exclusive
1105                                          * lock on the relation; shouldn't we raise an error?
1106                                          */
1107                                         elog(WARNING, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
1108                                                  relname, blkno, offnum, tuple.t_data->t_xmin);
1109                                         do_shrinking = false;
1110                                         break;
1111                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1112
1113                                         /*
1114                                          * This should not happen, since we hold exclusive
1115                                          * lock on the relation; shouldn't we raise an error?
1116                                          */
1117                                         elog(WARNING, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
1118                                                  relname, blkno, offnum, tuple.t_data->t_xmax);
1119                                         do_shrinking = false;
1120                                         break;
1121                                 default:
1122                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
1123                                         break;
1124                         }
1125
1126                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1127                         if (sv_infomask != tuple.t_data->t_infomask)
1128                                 pgchanged = true;
1129
1130                         /*
1131                          * Other checks...
1132                          */
1133                         if (!OidIsValid(tuple.t_data->t_oid) &&
1134                                 onerel->rd_rel->relhasoids)
1135                                 elog(WARNING, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
1136                                          relname, blkno, offnum, (int) tupgone);
1137
1138                         if (tupgone)
1139                         {
1140                                 ItemId          lpp;
1141
1142                                 /*
1143                                  * Here we are building a temporary copy of the page with
1144                                  * dead tuples removed.  Below we will apply
1145                                  * PageRepairFragmentation to the copy, so that we can
1146                                  * determine how much space will be available after
1147                                  * removal of dead tuples.      But note we are NOT changing
1148                                  * the real page yet...
1149                                  */
1150                                 if (tempPage == (Page) NULL)
1151                                 {
1152                                         Size            pageSize;
1153
1154                                         pageSize = PageGetPageSize(page);
1155                                         tempPage = (Page) palloc(pageSize);
1156                                         memcpy(tempPage, page, pageSize);
1157                                 }
1158
1159                                 /* mark it unused on the temp page */
1160                                 lpp = PageGetItemId(tempPage, offnum);
1161                                 lpp->lp_flags &= ~LP_USED;
1162
1163                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1164                                 tups_vacuumed += 1;
1165                         }
1166                         else
1167                         {
1168                                 num_tuples += 1;
1169                                 notup = false;
1170                                 if (tuple.t_len < min_tlen)
1171                                         min_tlen = tuple.t_len;
1172                                 if (tuple.t_len > max_tlen)
1173                                         max_tlen = tuple.t_len;
1174                         }
1175                 }                                               /* scan along page */
1176
1177                 if (tempPage != (Page) NULL)
1178                 {
1179                         /* Some tuples are removable; figure free space after removal */
1180                         PageRepairFragmentation(tempPage, NULL);
1181                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1182                         pfree(tempPage);
1183                         do_reap = true;
1184                 }
1185                 else
1186                 {
1187                         /* Just use current available space */
1188                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1189                         /* Need to reap the page if it has ~LP_USED line pointers */
1190                         do_reap = (vacpage->offsets_free > 0);
1191                 }
1192
1193                 free_size += vacpage->free;
1194
1195                 /*
1196                  * Add the page to fraged_pages if it has a useful amount of free
1197                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1198                  * don't know that accurately near the start of the relation, so
1199                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1200                  */
1201                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1202
1203                 if (do_reap || do_frag)
1204                 {
1205                         vacpagecopy = copy_vac_page(vacpage);
1206                         if (do_reap)
1207                                 vpage_insert(vacuum_pages, vacpagecopy);
1208                         if (do_frag)
1209                                 vpage_insert(fraged_pages, vacpagecopy);
1210                 }
1211
1212                 if (notup)
1213                         empty_end_pages++;
1214                 else
1215                         empty_end_pages = 0;
1216
1217                 if (pgchanged)
1218                 {
1219                         WriteBuffer(buf);
1220                         changed_pages++;
1221                 }
1222                 else
1223                         ReleaseBuffer(buf);
1224         }
1225
1226         pfree(vacpage);
1227
1228         /* save stats in the rel list for use later */
1229         vacrelstats->rel_tuples = num_tuples;
1230         vacrelstats->rel_pages = nblocks;
1231         if (num_tuples == 0)
1232                 min_tlen = max_tlen = 0;
1233         vacrelstats->min_tlen = min_tlen;
1234         vacrelstats->max_tlen = max_tlen;
1235
1236         vacuum_pages->empty_end_pages = empty_end_pages;
1237         fraged_pages->empty_end_pages = empty_end_pages;
1238
1239         /*
1240          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1241          * remove any "empty" end-pages from the list, and compute usable free
1242          * space = free space in remaining pages.
1243          */
1244         if (do_shrinking)
1245         {
1246                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1247                 fraged_pages->num_pages -= empty_end_pages;
1248                 usable_free_size = 0;
1249                 for (i = 0; i < fraged_pages->num_pages; i++)
1250                         usable_free_size += fraged_pages->pagedesc[i]->free;
1251         }
1252         else
1253         {
1254                 fraged_pages->num_pages = 0;
1255                 usable_free_size = 0;
1256         }
1257
1258         if (usable_free_size > 0 && num_vtlinks > 0)
1259         {
1260                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1261                           vac_cmp_vtlinks);
1262                 vacrelstats->vtlinks = vtlinks;
1263                 vacrelstats->num_vtlinks = num_vtlinks;
1264         }
1265         else
1266         {
1267                 vacrelstats->vtlinks = NULL;
1268                 vacrelstats->num_vtlinks = 0;
1269                 pfree(vtlinks);
1270         }
1271
1272         elog(elevel, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
1273 Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
1274 Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u.\n\t%s",
1275                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1276                  new_pages, num_tuples, tups_vacuumed,
1277                  nkeep, vacrelstats->num_vtlinks,
1278                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1279                  free_size, usable_free_size,
1280                  empty_end_pages, fraged_pages->num_pages,
1281                  vac_show_rusage(&ru0));
1282
1283 }
1284
1285
1286 /*
1287  *      repair_frag() -- try to repair relation's fragmentation
1288  *
1289  *              This routine marks dead tuples as unused and tries re-use dead space
1290  *              by moving tuples (and inserting indexes if needed). It constructs
1291  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1292  *              for them after committing (in hack-manner - without losing locks
1293  *              and freeing memory!) current transaction. It truncates relation
1294  *              if some end-blocks are gone away.
1295  */
1296 static void
1297 repair_frag(VRelStats *vacrelstats, Relation onerel,
1298                         VacPageList vacuum_pages, VacPageList fraged_pages,
1299                         int nindexes, Relation *Irel)
1300 {
1301         TransactionId myXID;
1302         CommandId       myCID;
1303         Buffer          buf,
1304                                 cur_buffer;
1305         BlockNumber nblocks,
1306                                 blkno;
1307         BlockNumber last_move_dest_block = 0,
1308                                 last_vacuum_block;
1309         Page            page,
1310                                 ToPage = NULL;
1311         OffsetNumber offnum,
1312                                 maxoff,
1313                                 newoff,
1314                                 max_offset;
1315         ItemId          itemid,
1316                                 newitemid;
1317         HeapTupleData tuple,
1318                                 newtup;
1319         TupleDesc       tupdesc;
1320         ResultRelInfo *resultRelInfo;
1321         EState     *estate;
1322         TupleTable      tupleTable;
1323         TupleTableSlot *slot;
1324         VacPageListData Nvacpagelist;
1325         VacPage         cur_page = NULL,
1326                                 last_vacuum_page,
1327                                 vacpage,
1328                            *curpage;
1329         int                     cur_item = 0;
1330         int                     i;
1331         Size            tuple_len;
1332         int                     num_moved,
1333                                 num_fraged_pages,
1334                                 vacuumed_pages;
1335         int                     checked_moved,
1336                                 num_tuples,
1337                                 keep_tuples = 0;
1338         bool            isempty,
1339                                 dowrite,
1340                                 chain_tuple_moved;
1341         VacRUsage       ru0;
1342
1343         vac_init_rusage(&ru0);
1344
1345         myXID = GetCurrentTransactionId();
1346         myCID = GetCurrentCommandId();
1347
1348         tupdesc = RelationGetDescr(onerel);
1349
1350         /*
1351          * We need a ResultRelInfo and an EState so we can use the regular
1352          * executor's index-entry-making machinery.
1353          */
1354         resultRelInfo = makeNode(ResultRelInfo);
1355         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1356         resultRelInfo->ri_RelationDesc = onerel;
1357         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1358
1359         ExecOpenIndices(resultRelInfo);
1360
1361         estate = CreateExecutorState();
1362         estate->es_result_relations = resultRelInfo;
1363         estate->es_num_result_relations = 1;
1364         estate->es_result_relation_info = resultRelInfo;
1365
1366         /* Set up a dummy tuple table too */
1367         tupleTable = ExecCreateTupleTable(1);
1368         slot = ExecAllocTableSlot(tupleTable);
1369         ExecSetSlotDescriptor(slot, tupdesc, false);
1370
1371         Nvacpagelist.num_pages = 0;
1372         num_fraged_pages = fraged_pages->num_pages;
1373         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1374         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1375         if (vacuumed_pages > 0)
1376         {
1377                 /* get last reaped page from vacuum_pages */
1378                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1379                 last_vacuum_block = last_vacuum_page->blkno;
1380         }
1381         else
1382         {
1383                 last_vacuum_page = NULL;
1384                 last_vacuum_block = InvalidBlockNumber;
1385         }
1386         cur_buffer = InvalidBuffer;
1387         num_moved = 0;
1388
1389         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1390         vacpage->offsets_used = vacpage->offsets_free = 0;
1391
1392         /*
1393          * Scan pages backwards from the last nonempty page, trying to move
1394          * tuples down to lower pages.  Quit when we reach a page that we have
1395          * moved any tuples onto, or the first page if we haven't moved
1396          * anything, or when we find a page we cannot completely empty (this
1397          * last condition is handled by "break" statements within the loop).
1398          *
1399          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1400          * in order by blkno.
1401          */
1402         nblocks = vacrelstats->rel_pages;
1403         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1404                  blkno > last_move_dest_block;
1405                  blkno--)
1406         {
1407                 CHECK_FOR_INTERRUPTS();
1408
1409                 /*
1410                  * Forget fraged_pages pages at or after this one; they're no
1411                  * longer useful as move targets, since we only want to move down.
1412                  * Note that since we stop the outer loop at last_move_dest_block,
1413                  * pages removed here cannot have had anything moved onto them
1414                  * already.
1415                  *
1416                  * Also note that we don't change the stored fraged_pages list, only
1417                  * our local variable num_fraged_pages; so the forgotten pages are
1418                  * still available to be loaded into the free space map later.
1419                  */
1420                 while (num_fraged_pages > 0 &&
1421                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1422                 {
1423                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1424                         --num_fraged_pages;
1425                 }
1426
1427                 /*
1428                  * Process this page of relation.
1429                  */
1430                 buf = ReadBuffer(onerel, blkno);
1431                 page = BufferGetPage(buf);
1432
1433                 vacpage->offsets_free = 0;
1434
1435                 isempty = PageIsEmpty(page);
1436
1437                 dowrite = false;
1438
1439                 /* Is the page in the vacuum_pages list? */
1440                 if (blkno == last_vacuum_block)
1441                 {
1442                         if (last_vacuum_page->offsets_free > 0)
1443                         {
1444                                 /* there are dead tuples on this page - clean them */
1445                                 Assert(!isempty);
1446                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1447                                 vacuum_page(onerel, buf, last_vacuum_page);
1448                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1449                                 dowrite = true;
1450                         }
1451                         else
1452                                 Assert(isempty);
1453                         --vacuumed_pages;
1454                         if (vacuumed_pages > 0)
1455                         {
1456                                 /* get prev reaped page from vacuum_pages */
1457                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1458                                 last_vacuum_block = last_vacuum_page->blkno;
1459                         }
1460                         else
1461                         {
1462                                 last_vacuum_page = NULL;
1463                                 last_vacuum_block = InvalidBlockNumber;
1464                         }
1465                         if (isempty)
1466                         {
1467                                 ReleaseBuffer(buf);
1468                                 continue;
1469                         }
1470                 }
1471                 else
1472                         Assert(!isempty);
1473
1474                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1475                                                                                  * off this page, yet */
1476                 vacpage->blkno = blkno;
1477                 maxoff = PageGetMaxOffsetNumber(page);
1478                 for (offnum = FirstOffsetNumber;
1479                          offnum <= maxoff;
1480                          offnum = OffsetNumberNext(offnum))
1481                 {
1482                         itemid = PageGetItemId(page, offnum);
1483
1484                         if (!ItemIdIsUsed(itemid))
1485                                 continue;
1486
1487                         tuple.t_datamcxt = NULL;
1488                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1489                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1490                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1491
1492                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1493                         {
1494                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1495                                         elog(ERROR, "Invalid XID in t_cmin");
1496                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1497                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1498
1499                                 /*
1500                                  * If this (chain) tuple is moved by me already then I
1501                                  * have to check is it in vacpage or not - i.e. is it
1502                                  * moved while cleaning this page or some previous one.
1503                                  */
1504                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1505                                 {
1506                                         if (keep_tuples == 0)
1507                                                 continue;
1508                                         if (chain_tuple_moved)          /* some chains was moved
1509                                                                                                  * while */
1510                                         {                       /* cleaning this page */
1511                                                 Assert(vacpage->offsets_free > 0);
1512                                                 for (i = 0; i < vacpage->offsets_free; i++)
1513                                                 {
1514                                                         if (vacpage->offsets[i] == offnum)
1515                                                                 break;
1516                                                 }
1517                                                 if (i >= vacpage->offsets_free) /* not found */
1518                                                 {
1519                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1520                                                         keep_tuples--;
1521                                                 }
1522                                         }
1523                                         else
1524                                         {
1525                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1526                                                 keep_tuples--;
1527                                         }
1528                                         continue;
1529                                 }
1530                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1531                         }
1532
1533                         /*
1534                          * If this tuple is in the chain of tuples created in updates
1535                          * by "recent" transactions then we have to move all chain of
1536                          * tuples to another places.
1537                          */
1538                         if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
1539                          !TransactionIdPrecedes(tuple.t_data->t_xmin, OldestXmin)) ||
1540                                 (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1541                                  !(ItemPointerEquals(&(tuple.t_self),
1542                                                                          &(tuple.t_data->t_ctid)))))
1543                         {
1544                                 Buffer          Cbuf = buf;
1545                                 Page            Cpage;
1546                                 ItemId          Citemid;
1547                                 ItemPointerData Ctid;
1548                                 HeapTupleData tp = tuple;
1549                                 Size            tlen = tuple_len;
1550                                 VTupleMove      vtmove = (VTupleMove)
1551                                 palloc(100 * sizeof(VTupleMoveData));
1552                                 int                     num_vtmove = 0;
1553                                 int                     free_vtmove = 100;
1554                                 VacPage         to_vacpage = NULL;
1555                                 int                     to_item = 0;
1556                                 bool            freeCbuf = false;
1557                                 int                     ti;
1558
1559                                 if (vacrelstats->vtlinks == NULL)
1560                                         elog(ERROR, "No one parent tuple was found");
1561                                 if (cur_buffer != InvalidBuffer)
1562                                 {
1563                                         WriteBuffer(cur_buffer);
1564                                         cur_buffer = InvalidBuffer;
1565                                 }
1566
1567                                 /*
1568                                  * If this tuple is in the begin/middle of the chain then
1569                                  * we have to move to the end of chain.
1570                                  */
1571                                 while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1572                                            !(ItemPointerEquals(&(tp.t_self),
1573                                                                                    &(tp.t_data->t_ctid))))
1574                                 {
1575                                         Ctid = tp.t_data->t_ctid;
1576                                         if (freeCbuf)
1577                                                 ReleaseBuffer(Cbuf);
1578                                         freeCbuf = true;
1579                                         Cbuf = ReadBuffer(onerel,
1580                                                                           ItemPointerGetBlockNumber(&Ctid));
1581                                         Cpage = BufferGetPage(Cbuf);
1582                                         Citemid = PageGetItemId(Cpage,
1583                                                                           ItemPointerGetOffsetNumber(&Ctid));
1584                                         if (!ItemIdIsUsed(Citemid))
1585                                         {
1586                                                 /*
1587                                                  * This means that in the middle of chain there
1588                                                  * was tuple updated by older (than OldestXmin)
1589                                                  * xaction and this tuple is already deleted by
1590                                                  * me. Actually, upper part of chain should be
1591                                                  * removed and seems that this should be handled
1592                                                  * in scan_heap(), but it's not implemented at the
1593                                                  * moment and so we just stop shrinking here.
1594                                                  */
1595                                                 ReleaseBuffer(Cbuf);
1596                                                 pfree(vtmove);
1597                                                 vtmove = NULL;
1598                                                 elog(WARNING, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1599                                                 break;
1600                                         }
1601                                         tp.t_datamcxt = NULL;
1602                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1603                                         tp.t_self = Ctid;
1604                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1605                                 }
1606                                 if (vtmove == NULL)
1607                                         break;
1608                                 /* first, can chain be moved ? */
1609                                 for (;;)
1610                                 {
1611                                         if (to_vacpage == NULL ||
1612                                                 !enough_space(to_vacpage, tlen))
1613                                         {
1614                                                 for (i = 0; i < num_fraged_pages; i++)
1615                                                 {
1616                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1617                                                                 break;
1618                                                 }
1619
1620                                                 if (i == num_fraged_pages)
1621                                                 {
1622                                                         /* can't move item anywhere */
1623                                                         for (i = 0; i < num_vtmove; i++)
1624                                                         {
1625                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1626                                                                 (vtmove[i].vacpage->offsets_used)--;
1627                                                         }
1628                                                         num_vtmove = 0;
1629                                                         break;
1630                                                 }
1631                                                 to_item = i;
1632                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1633                                         }
1634                                         to_vacpage->free -= MAXALIGN(tlen);
1635                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1636                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1637                                         (to_vacpage->offsets_used)++;
1638                                         if (free_vtmove == 0)
1639                                         {
1640                                                 free_vtmove = 1000;
1641                                                 vtmove = (VTupleMove) repalloc(vtmove,
1642                                                                                          (free_vtmove + num_vtmove) *
1643                                                                                                  sizeof(VTupleMoveData));
1644                                         }
1645                                         vtmove[num_vtmove].tid = tp.t_self;
1646                                         vtmove[num_vtmove].vacpage = to_vacpage;
1647                                         if (to_vacpage->offsets_used == 1)
1648                                                 vtmove[num_vtmove].cleanVpd = true;
1649                                         else
1650                                                 vtmove[num_vtmove].cleanVpd = false;
1651                                         free_vtmove--;
1652                                         num_vtmove++;
1653
1654                                         /* All done ? */
1655                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1656                                         TransactionIdPrecedes(tp.t_data->t_xmin, OldestXmin))
1657                                                 break;
1658
1659                                         /* Well, try to find tuple with old row version */
1660                                         for (;;)
1661                                         {
1662                                                 Buffer          Pbuf;
1663                                                 Page            Ppage;
1664                                                 ItemId          Pitemid;
1665                                                 HeapTupleData Ptp;
1666                                                 VTupleLinkData vtld,
1667                                                                    *vtlp;
1668
1669                                                 vtld.new_tid = tp.t_self;
1670                                                 vtlp = (VTupleLink)
1671                                                         vac_bsearch((void *) &vtld,
1672                                                                                 (void *) (vacrelstats->vtlinks),
1673                                                                                 vacrelstats->num_vtlinks,
1674                                                                                 sizeof(VTupleLinkData),
1675                                                                                 vac_cmp_vtlinks);
1676                                                 if (vtlp == NULL)
1677                                                         elog(ERROR, "Parent tuple was not found");
1678                                                 tp.t_self = vtlp->this_tid;
1679                                                 Pbuf = ReadBuffer(onerel,
1680                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1681                                                 Ppage = BufferGetPage(Pbuf);
1682                                                 Pitemid = PageGetItemId(Ppage,
1683                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1684                                                 if (!ItemIdIsUsed(Pitemid))
1685                                                         elog(ERROR, "Parent itemid marked as unused");
1686                                                 Ptp.t_datamcxt = NULL;
1687                                                 Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1688                                                 Assert(ItemPointerEquals(&(vtld.new_tid),
1689                                                                                                  &(Ptp.t_data->t_ctid)));
1690
1691                                                 /*
1692                                                  * Read above about cases when
1693                                                  * !ItemIdIsUsed(Citemid) (child item is
1694                                                  * removed)... Due to the fact that at the moment
1695                                                  * we don't remove unuseful part of update-chain,
1696                                                  * it's possible to get too old parent row here.
1697                                                  * Like as in the case which caused this problem,
1698                                                  * we stop shrinking here. I could try to find
1699                                                  * real parent row but want not to do it because
1700                                                  * of real solution will be implemented anyway,
1701                                                  * latter, and we are too close to 6.5 release. -
1702                                                  * vadim 06/11/99
1703                                                  */
1704                                                 if (!(TransactionIdEquals(Ptp.t_data->t_xmax,
1705                                                                                                   tp.t_data->t_xmin)))
1706                                                 {
1707                                                         if (freeCbuf)
1708                                                                 ReleaseBuffer(Cbuf);
1709                                                         freeCbuf = false;
1710                                                         ReleaseBuffer(Pbuf);
1711                                                         for (i = 0; i < num_vtmove; i++)
1712                                                         {
1713                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1714                                                                 (vtmove[i].vacpage->offsets_used)--;
1715                                                         }
1716                                                         num_vtmove = 0;
1717                                                         elog(WARNING, "Too old parent tuple found - can't continue repair_frag");
1718                                                         break;
1719                                                 }
1720 #ifdef NOT_USED                                 /* I'm not sure that this will wotk
1721                                                                  * properly... */
1722
1723                                                 /*
1724                                                  * If this tuple is updated version of row and it
1725                                                  * was created by the same transaction then no one
1726                                                  * is interested in this tuple - mark it as
1727                                                  * removed.
1728                                                  */
1729                                                 if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
1730                                                         TransactionIdEquals(Ptp.t_data->t_xmin,
1731                                                                                                 Ptp.t_data->t_xmax))
1732                                                 {
1733                                                         TransactionIdStore(myXID,
1734                                                                 (TransactionId *) &(Ptp.t_data->t_cmin));
1735                                                         Ptp.t_data->t_infomask &=
1736                                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1737                                                         Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
1738                                                         WriteBuffer(Pbuf);
1739                                                         continue;
1740                                                 }
1741 #endif
1742                                                 tp.t_datamcxt = Ptp.t_datamcxt;
1743                                                 tp.t_data = Ptp.t_data;
1744                                                 tlen = tp.t_len = ItemIdGetLength(Pitemid);
1745                                                 if (freeCbuf)
1746                                                         ReleaseBuffer(Cbuf);
1747                                                 Cbuf = Pbuf;
1748                                                 freeCbuf = true;
1749                                                 break;
1750                                         }
1751                                         if (num_vtmove == 0)
1752                                                 break;
1753                                 }
1754                                 if (freeCbuf)
1755                                         ReleaseBuffer(Cbuf);
1756                                 if (num_vtmove == 0)    /* chain can't be moved */
1757                                 {
1758                                         pfree(vtmove);
1759                                         break;
1760                                 }
1761                                 ItemPointerSetInvalid(&Ctid);
1762                                 for (ti = 0; ti < num_vtmove; ti++)
1763                                 {
1764                                         VacPage         destvacpage = vtmove[ti].vacpage;
1765
1766                                         /* Get page to move from */
1767                                         tuple.t_self = vtmove[ti].tid;
1768                                         Cbuf = ReadBuffer(onerel,
1769                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1770
1771                                         /* Get page to move to */
1772                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1773
1774                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1775                                         if (cur_buffer != Cbuf)
1776                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1777
1778                                         ToPage = BufferGetPage(cur_buffer);
1779                                         Cpage = BufferGetPage(Cbuf);
1780
1781                                         Citemid = PageGetItemId(Cpage,
1782                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1783                                         tuple.t_datamcxt = NULL;
1784                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1785                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1786
1787                                         /*
1788                                          * make a copy of the source tuple, and then mark the
1789                                          * source tuple MOVED_OFF.
1790                                          */
1791                                         heap_copytuple_with_tuple(&tuple, &newtup);
1792
1793                                         /*
1794                                          * register invalidation of source tuple in catcaches.
1795                                          */
1796                                         CacheInvalidateHeapTuple(onerel, &tuple);
1797
1798                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1799                                         START_CRIT_SECTION();
1800
1801                                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1802                                         tuple.t_data->t_infomask &=
1803                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1804                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1805
1806                                         /*
1807                                          * If this page was not used before - clean it.
1808                                          *
1809                                          * NOTE: a nasty bug used to lurk here.  It is possible
1810                                          * for the source and destination pages to be the same
1811                                          * (since this tuple-chain member can be on a page
1812                                          * lower than the one we're currently processing in
1813                                          * the outer loop).  If that's true, then after
1814                                          * vacuum_page() the source tuple will have been
1815                                          * moved, and tuple.t_data will be pointing at
1816                                          * garbage.  Therefore we must do everything that uses
1817                                          * tuple.t_data BEFORE this step!!
1818                                          *
1819                                          * This path is different from the other callers of
1820                                          * vacuum_page, because we have already incremented
1821                                          * the vacpage's offsets_used field to account for the
1822                                          * tuple(s) we expect to move onto the page. Therefore
1823                                          * vacuum_page's check for offsets_used == 0 is wrong.
1824                                          * But since that's a good debugging check for all
1825                                          * other callers, we work around it here rather than
1826                                          * remove it.
1827                                          */
1828                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1829                                         {
1830                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1831
1832                                                 destvacpage->offsets_used = 0;
1833                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1834                                                 destvacpage->offsets_used = sv_offsets_used;
1835                                         }
1836
1837                                         /*
1838                                          * Update the state of the copied tuple, and store it
1839                                          * on the destination page.
1840                                          */
1841                                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1842                                         newtup.t_data->t_infomask &=
1843                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1844                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1845                                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1846                                                                                  InvalidOffsetNumber, LP_USED);
1847                                         if (newoff == InvalidOffsetNumber)
1848                                         {
1849                                                 elog(PANIC, "moving chain: failed to add item with len = %lu to page %u",
1850                                                   (unsigned long) tuple_len, destvacpage->blkno);
1851                                         }
1852                                         newitemid = PageGetItemId(ToPage, newoff);
1853                                         pfree(newtup.t_data);
1854                                         newtup.t_datamcxt = NULL;
1855                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1856                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1857
1858                                         {
1859                                                 XLogRecPtr      recptr =
1860                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1861                                                                           cur_buffer, &newtup);
1862
1863                                                 if (Cbuf != cur_buffer)
1864                                                 {
1865                                                         PageSetLSN(Cpage, recptr);
1866                                                         PageSetSUI(Cpage, ThisStartUpID);
1867                                                 }
1868                                                 PageSetLSN(ToPage, recptr);
1869                                                 PageSetSUI(ToPage, ThisStartUpID);
1870                                         }
1871                                         END_CRIT_SECTION();
1872
1873                                         if (destvacpage->blkno > last_move_dest_block)
1874                                                 last_move_dest_block = destvacpage->blkno;
1875
1876                                         /*
1877                                          * Set new tuple's t_ctid pointing to itself for last
1878                                          * tuple in chain, and to next tuple in chain
1879                                          * otherwise.
1880                                          */
1881                                         if (!ItemPointerIsValid(&Ctid))
1882                                                 newtup.t_data->t_ctid = newtup.t_self;
1883                                         else
1884                                                 newtup.t_data->t_ctid = Ctid;
1885                                         Ctid = newtup.t_self;
1886
1887                                         num_moved++;
1888
1889                                         /*
1890                                          * Remember that we moved tuple from the current page
1891                                          * (corresponding index tuple will be cleaned).
1892                                          */
1893                                         if (Cbuf == buf)
1894                                                 vacpage->offsets[vacpage->offsets_free++] =
1895                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1896                                         else
1897                                                 keep_tuples++;
1898
1899                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1900                                         if (cur_buffer != Cbuf)
1901                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
1902
1903                                         /* Create index entries for the moved tuple */
1904                                         if (resultRelInfo->ri_NumIndices > 0)
1905                                         {
1906                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
1907                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
1908                                                                                           estate, true);
1909                                         }
1910
1911                                         WriteBuffer(cur_buffer);
1912                                         WriteBuffer(Cbuf);
1913                                 }
1914                                 cur_buffer = InvalidBuffer;
1915                                 pfree(vtmove);
1916                                 chain_tuple_moved = true;
1917                                 continue;
1918                         }
1919
1920                         /* try to find new page for this tuple */
1921                         if (cur_buffer == InvalidBuffer ||
1922                                 !enough_space(cur_page, tuple_len))
1923                         {
1924                                 if (cur_buffer != InvalidBuffer)
1925                                 {
1926                                         WriteBuffer(cur_buffer);
1927                                         cur_buffer = InvalidBuffer;
1928                                 }
1929                                 for (i = 0; i < num_fraged_pages; i++)
1930                                 {
1931                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
1932                                                 break;
1933                                 }
1934                                 if (i == num_fraged_pages)
1935                                         break;          /* can't move item anywhere */
1936                                 cur_item = i;
1937                                 cur_page = fraged_pages->pagedesc[cur_item];
1938                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
1939                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1940                                 ToPage = BufferGetPage(cur_buffer);
1941                                 /* if this page was not used before - clean it */
1942                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
1943                                         vacuum_page(onerel, cur_buffer, cur_page);
1944                         }
1945                         else
1946                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1947
1948                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1949
1950                         /* copy tuple */
1951                         heap_copytuple_with_tuple(&tuple, &newtup);
1952
1953                         /*
1954                          * register invalidation of source tuple in catcaches.
1955                          *
1956                          * (Note: we do not need to register the copied tuple,
1957                          * because we are not changing the tuple contents and
1958                          * so there cannot be any need to flush negative
1959                          * catcache entries.)
1960                          */
1961                         CacheInvalidateHeapTuple(onerel, &tuple);
1962
1963                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1964                         START_CRIT_SECTION();
1965
1966                         /*
1967                          * Mark new tuple as moved_in by vacuum and store vacuum XID
1968                          * in t_cmin !!!
1969                          */
1970                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1971                         newtup.t_data->t_infomask &=
1972                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1973                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1974
1975                         /* add tuple to the page */
1976                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1977                                                                  InvalidOffsetNumber, LP_USED);
1978                         if (newoff == InvalidOffsetNumber)
1979                         {
1980                                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
1981                                          (unsigned long) tuple_len,
1982                                          cur_page->blkno, (unsigned long) cur_page->free,
1983                                          cur_page->offsets_used, cur_page->offsets_free);
1984                         }
1985                         newitemid = PageGetItemId(ToPage, newoff);
1986                         pfree(newtup.t_data);
1987                         newtup.t_datamcxt = NULL;
1988                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1989                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
1990                         newtup.t_self = newtup.t_data->t_ctid;
1991
1992                         /*
1993                          * Mark old tuple as moved_off by vacuum and store vacuum XID
1994                          * in t_cmin !!!
1995                          */
1996                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1997                         tuple.t_data->t_infomask &=
1998                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1999                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
2000
2001                         {
2002                                 XLogRecPtr      recptr =
2003                                 log_heap_move(onerel, buf, tuple.t_self,
2004                                                           cur_buffer, &newtup);
2005
2006                                 PageSetLSN(page, recptr);
2007                                 PageSetSUI(page, ThisStartUpID);
2008                                 PageSetLSN(ToPage, recptr);
2009                                 PageSetSUI(ToPage, ThisStartUpID);
2010                         }
2011                         END_CRIT_SECTION();
2012
2013                         cur_page->offsets_used++;
2014                         num_moved++;
2015                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
2016                         if (cur_page->blkno > last_move_dest_block)
2017                                 last_move_dest_block = cur_page->blkno;
2018
2019                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2020
2021                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2022                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2023
2024                         /* insert index' tuples if needed */
2025                         if (resultRelInfo->ri_NumIndices > 0)
2026                         {
2027                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2028                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
2029                         }
2030                 }                                               /* walk along page */
2031
2032                 if (offnum < maxoff && keep_tuples > 0)
2033                 {
2034                         OffsetNumber off;
2035
2036                         for (off = OffsetNumberNext(offnum);
2037                                  off <= maxoff;
2038                                  off = OffsetNumberNext(off))
2039                         {
2040                                 itemid = PageGetItemId(page, off);
2041                                 if (!ItemIdIsUsed(itemid))
2042                                         continue;
2043                                 tuple.t_datamcxt = NULL;
2044                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2045                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2046                                         continue;
2047                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
2048                                         elog(ERROR, "Invalid XID in t_cmin (4)");
2049                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2050                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
2051                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2052                                 {
2053                                         /* some chains was moved while */
2054                                         if (chain_tuple_moved)
2055                                         {                       /* cleaning this page */
2056                                                 Assert(vacpage->offsets_free > 0);
2057                                                 for (i = 0; i < vacpage->offsets_free; i++)
2058                                                 {
2059                                                         if (vacpage->offsets[i] == off)
2060                                                                 break;
2061                                                 }
2062                                                 if (i >= vacpage->offsets_free) /* not found */
2063                                                 {
2064                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2065                                                         Assert(keep_tuples > 0);
2066                                                         keep_tuples--;
2067                                                 }
2068                                         }
2069                                         else
2070                                         {
2071                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2072                                                 Assert(keep_tuples > 0);
2073                                                 keep_tuples--;
2074                                         }
2075                                 }
2076                         }
2077                 }
2078
2079                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2080                 {
2081                         if (chain_tuple_moved)          /* else - they are ordered */
2082                         {
2083                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2084                                           sizeof(OffsetNumber), vac_cmp_offno);
2085                         }
2086                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2087                         WriteBuffer(buf);
2088                 }
2089                 else if (dowrite)
2090                         WriteBuffer(buf);
2091                 else
2092                         ReleaseBuffer(buf);
2093
2094                 if (offnum <= maxoff)
2095                         break;                          /* some item(s) left */
2096
2097         }                                                       /* walk along relation */
2098
2099         blkno++;                                        /* new number of blocks */
2100
2101         if (cur_buffer != InvalidBuffer)
2102         {
2103                 Assert(num_moved > 0);
2104                 WriteBuffer(cur_buffer);
2105         }
2106
2107         if (num_moved > 0)
2108         {
2109                 /*
2110                  * We have to commit our tuple movings before we truncate the
2111                  * relation.  Ideally we should do Commit/StartTransactionCommand
2112                  * here, relying on the session-level table lock to protect our
2113                  * exclusive access to the relation.  However, that would require
2114                  * a lot of extra code to close and re-open the relation, indexes,
2115                  * etc.  For now, a quick hack: record status of current
2116                  * transaction as committed, and continue.
2117                  */
2118                 RecordTransactionCommit();
2119         }
2120
2121         /*
2122          * We are not going to move any more tuples across pages, but we still
2123          * need to apply vacuum_page to compact free space in the remaining
2124          * pages in vacuum_pages list.  Note that some of these pages may also
2125          * be in the fraged_pages list, and may have had tuples moved onto
2126          * them; if so, we already did vacuum_page and needn't do it again.
2127          */
2128         for (i = 0, curpage = vacuum_pages->pagedesc;
2129                  i < vacuumed_pages;
2130                  i++, curpage++)
2131         {
2132                 CHECK_FOR_INTERRUPTS();
2133                 Assert((*curpage)->blkno < blkno);
2134                 if ((*curpage)->offsets_used == 0)
2135                 {
2136                         /* this page was not used as a move target, so must clean it */
2137                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2138                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2139                         page = BufferGetPage(buf);
2140                         if (!PageIsEmpty(page))
2141                                 vacuum_page(onerel, buf, *curpage);
2142                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2143                         WriteBuffer(buf);
2144                 }
2145         }
2146
2147         /*
2148          * Now scan all the pages that we moved tuples onto and update tuple
2149          * status bits.  This is not really necessary, but will save time for
2150          * future transactions examining these tuples.
2151          *
2152          * XXX WARNING that this code fails to clear HEAP_MOVED_OFF tuples from
2153          * pages that were move source pages but not move dest pages.  One
2154          * also wonders whether it wouldn't be better to skip this step and
2155          * let the tuple status updates happen someplace that's not holding an
2156          * exclusive lock on the relation.
2157          */
2158         checked_moved = 0;
2159         for (i = 0, curpage = fraged_pages->pagedesc;
2160                  i < num_fraged_pages;
2161                  i++, curpage++)
2162         {
2163                 CHECK_FOR_INTERRUPTS();
2164                 Assert((*curpage)->blkno < blkno);
2165                 if ((*curpage)->blkno > last_move_dest_block)
2166                         break;                          /* no need to scan any further */
2167                 if ((*curpage)->offsets_used == 0)
2168                         continue;                       /* this page was never used as a move dest */
2169                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2170                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2171                 page = BufferGetPage(buf);
2172                 num_tuples = 0;
2173                 max_offset = PageGetMaxOffsetNumber(page);
2174                 for (newoff = FirstOffsetNumber;
2175                          newoff <= max_offset;
2176                          newoff = OffsetNumberNext(newoff))
2177                 {
2178                         itemid = PageGetItemId(page, newoff);
2179                         if (!ItemIdIsUsed(itemid))
2180                                 continue;
2181                         tuple.t_datamcxt = NULL;
2182                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2183                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2184                         {
2185                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
2186                                         elog(ERROR, "Invalid XID in t_cmin (2)");
2187                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2188                                 {
2189                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2190                                         num_tuples++;
2191                                 }
2192                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2193                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2194                                 else
2195                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2196                         }
2197                 }
2198                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2199                 WriteBuffer(buf);
2200                 Assert((*curpage)->offsets_used == num_tuples);
2201                 checked_moved += num_tuples;
2202         }
2203         Assert(num_moved == checked_moved);
2204
2205         elog(elevel, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
2206                  RelationGetRelationName(onerel),
2207                  nblocks, blkno, num_moved,
2208                  vac_show_rusage(&ru0));
2209
2210         /*
2211          * Reflect the motion of system tuples to catalog cache here.
2212          */
2213         CommandCounterIncrement();
2214
2215         if (Nvacpagelist.num_pages > 0)
2216         {
2217                 /* vacuum indexes again if needed */
2218                 if (Irel != (Relation *) NULL)
2219                 {
2220                         VacPage    *vpleft,
2221                                            *vpright,
2222                                                 vpsave;
2223
2224                         /* re-sort Nvacpagelist.pagedesc */
2225                         for (vpleft = Nvacpagelist.pagedesc,
2226                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2227                                  vpleft < vpright; vpleft++, vpright--)
2228                         {
2229                                 vpsave = *vpleft;
2230                                 *vpleft = *vpright;
2231                                 *vpright = vpsave;
2232                         }
2233                         Assert(keep_tuples >= 0);
2234                         for (i = 0; i < nindexes; i++)
2235                                 vacuum_index(&Nvacpagelist, Irel[i],
2236                                                          vacrelstats->rel_tuples, keep_tuples);
2237                 }
2238
2239                 /* clean moved tuples from last page in Nvacpagelist list */
2240                 if (vacpage->blkno == (blkno - 1) &&
2241                         vacpage->offsets_free > 0)
2242                 {
2243                         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2244                         OffsetNumber *unused = unbuf;
2245                         int                     uncnt;
2246
2247                         buf = ReadBuffer(onerel, vacpage->blkno);
2248                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2249                         page = BufferGetPage(buf);
2250                         num_tuples = 0;
2251                         maxoff = PageGetMaxOffsetNumber(page);
2252                         for (offnum = FirstOffsetNumber;
2253                                  offnum <= maxoff;
2254                                  offnum = OffsetNumberNext(offnum))
2255                         {
2256                                 itemid = PageGetItemId(page, offnum);
2257                                 if (!ItemIdIsUsed(itemid))
2258                                         continue;
2259                                 tuple.t_datamcxt = NULL;
2260                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2261
2262                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2263                                 {
2264                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
2265                                                 elog(ERROR, "Invalid XID in t_cmin (3)");
2266                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2267                                         {
2268                                                 itemid->lp_flags &= ~LP_USED;
2269                                                 num_tuples++;
2270                                         }
2271                                         else
2272                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2273                                 }
2274
2275                         }
2276                         Assert(vacpage->offsets_free == num_tuples);
2277                         START_CRIT_SECTION();
2278                         uncnt = PageRepairFragmentation(page, unused);
2279                         {
2280                                 XLogRecPtr      recptr;
2281
2282                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
2283                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2284                                 PageSetLSN(page, recptr);
2285                                 PageSetSUI(page, ThisStartUpID);
2286                         }
2287                         END_CRIT_SECTION();
2288                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2289                         WriteBuffer(buf);
2290                 }
2291
2292                 /* now - free new list of reaped pages */
2293                 curpage = Nvacpagelist.pagedesc;
2294                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2295                         pfree(*curpage);
2296                 pfree(Nvacpagelist.pagedesc);
2297         }
2298
2299         /*
2300          * Flush dirty pages out to disk.  We do this unconditionally, even if
2301          * we don't need to truncate, because we want to ensure that all
2302          * tuples have correct on-row commit status on disk (see bufmgr.c's
2303          * comments for FlushRelationBuffers()).
2304          */
2305         i = FlushRelationBuffers(onerel, blkno);
2306         if (i < 0)
2307                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2308                          i);
2309
2310         /* truncate relation, if needed */
2311         if (blkno < nblocks)
2312         {
2313                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2314                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2315                 onerel->rd_targblock = InvalidBlockNumber;
2316                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2317         }
2318
2319         /* clean up */
2320         pfree(vacpage);
2321         if (vacrelstats->vtlinks != NULL)
2322                 pfree(vacrelstats->vtlinks);
2323
2324         ExecDropTupleTable(tupleTable, true);
2325
2326         ExecCloseIndices(resultRelInfo);
2327 }
2328
2329 /*
2330  *      vacuum_heap() -- free dead tuples
2331  *
2332  *              This routine marks dead tuples as unused and truncates relation
2333  *              if there are "empty" end-blocks.
2334  */
2335 static void
2336 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2337 {
2338         Buffer          buf;
2339         VacPage    *vacpage;
2340         BlockNumber relblocks;
2341         int                     nblocks;
2342         int                     i;
2343
2344         nblocks = vacuum_pages->num_pages;
2345         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2346
2347         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2348         {
2349                 CHECK_FOR_INTERRUPTS();
2350                 if ((*vacpage)->offsets_free > 0)
2351                 {
2352                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2353                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2354                         vacuum_page(onerel, buf, *vacpage);
2355                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2356                         WriteBuffer(buf);
2357                 }
2358         }
2359
2360         /*
2361          * Flush dirty pages out to disk.  We do this unconditionally, even if
2362          * we don't need to truncate, because we want to ensure that all
2363          * tuples have correct on-row commit status on disk (see bufmgr.c's
2364          * comments for FlushRelationBuffers()).
2365          */
2366         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2367         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2368
2369         i = FlushRelationBuffers(onerel, relblocks);
2370         if (i < 0)
2371                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2372                          i);
2373
2374         /* truncate relation if there are some empty end-pages */
2375         if (vacuum_pages->empty_end_pages > 0)
2376         {
2377                 elog(elevel, "Rel %s: Pages: %u --> %u.",
2378                          RelationGetRelationName(onerel),
2379                          vacrelstats->rel_pages, relblocks);
2380                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2381                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2382                 onerel->rd_targblock = InvalidBlockNumber;
2383                 vacrelstats->rel_pages = relblocks;             /* set new number of
2384                                                                                                  * blocks */
2385         }
2386 }
2387
2388 /*
2389  *      vacuum_page() -- free dead tuples on a page
2390  *                                       and repair its fragmentation.
2391  */
2392 static void
2393 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2394 {
2395         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2396         OffsetNumber *unused = unbuf;
2397         int                     uncnt;
2398         Page            page = BufferGetPage(buffer);
2399         ItemId          itemid;
2400         int                     i;
2401
2402         /* There shouldn't be any tuples moved onto the page yet! */
2403         Assert(vacpage->offsets_used == 0);
2404
2405         START_CRIT_SECTION();
2406         for (i = 0; i < vacpage->offsets_free; i++)
2407         {
2408                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2409                 itemid->lp_flags &= ~LP_USED;
2410         }
2411         uncnt = PageRepairFragmentation(page, unused);
2412         {
2413                 XLogRecPtr      recptr;
2414
2415                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2416                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2417                 PageSetLSN(page, recptr);
2418                 PageSetSUI(page, ThisStartUpID);
2419         }
2420         END_CRIT_SECTION();
2421 }
2422
2423 /*
2424  *      scan_index() -- scan one index relation to update statistic.
2425  *
2426  * We use this when we have no deletions to do.
2427  */
2428 static void
2429 scan_index(Relation indrel, double num_tuples)
2430 {
2431         IndexBulkDeleteResult *stats;
2432         VacRUsage       ru0;
2433
2434         vac_init_rusage(&ru0);
2435
2436         /*
2437          * Even though we're not planning to delete anything, use the
2438          * ambulkdelete call, so that the scan happens within the index AM for
2439          * more speed.
2440          */
2441         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2442
2443         if (!stats)
2444                 return;
2445
2446         /* now update statistics in pg_class */
2447         vac_update_relstats(RelationGetRelid(indrel),
2448                                                 stats->num_pages, stats->num_index_tuples,
2449                                                 false);
2450
2451         elog(elevel, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
2452                  RelationGetRelationName(indrel),
2453                  stats->num_pages, stats->num_index_tuples,
2454                  vac_show_rusage(&ru0));
2455
2456         /*
2457          * Check for tuple count mismatch.      If the index is partial, then it's
2458          * OK for it to have fewer tuples than the heap; else we got trouble.
2459          */
2460         if (stats->num_index_tuples != num_tuples)
2461         {
2462                 if (stats->num_index_tuples > num_tuples ||
2463                         !vac_is_partial_index(indrel))
2464                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2465 \n\tRecreate the index.",
2466                                  RelationGetRelationName(indrel),
2467                                  stats->num_index_tuples, num_tuples);
2468         }
2469
2470         pfree(stats);
2471 }
2472
2473 /*
2474  *      vacuum_index() -- vacuum one index relation.
2475  *
2476  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2477  *              It's locked. Indrel is an index relation on the vacuumed heap.
2478  *
2479  *              We don't bother to set locks on the index relation here, since
2480  *              the parent table is exclusive-locked already.
2481  *
2482  *              Finally, we arrange to update the index relation's statistics in
2483  *              pg_class.
2484  */
2485 static void
2486 vacuum_index(VacPageList vacpagelist, Relation indrel,
2487                          double num_tuples, int keep_tuples)
2488 {
2489         IndexBulkDeleteResult *stats;
2490         VacRUsage       ru0;
2491
2492         vac_init_rusage(&ru0);
2493
2494         /* Do bulk deletion */
2495         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2496
2497         if (!stats)
2498                 return;
2499
2500         /* now update statistics in pg_class */
2501         vac_update_relstats(RelationGetRelid(indrel),
2502                                                 stats->num_pages, stats->num_index_tuples,
2503                                                 false);
2504
2505         elog(elevel, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
2506                  RelationGetRelationName(indrel), stats->num_pages,
2507                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2508                  vac_show_rusage(&ru0));
2509
2510         /*
2511          * Check for tuple count mismatch.      If the index is partial, then it's
2512          * OK for it to have fewer tuples than the heap; else we got trouble.
2513          */
2514         if (stats->num_index_tuples != num_tuples + keep_tuples)
2515         {
2516                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2517                         !vac_is_partial_index(indrel))
2518                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2519 \n\tRecreate the index.",
2520                                  RelationGetRelationName(indrel),
2521                                  stats->num_index_tuples, num_tuples);
2522         }
2523
2524         pfree(stats);
2525 }
2526
2527 /*
2528  *      tid_reaped() -- is a particular tid reaped?
2529  *
2530  *              This has the right signature to be an IndexBulkDeleteCallback.
2531  *
2532  *              vacpagelist->VacPage_array is sorted in right order.
2533  */
2534 static bool
2535 tid_reaped(ItemPointer itemptr, void *state)
2536 {
2537         VacPageList vacpagelist = (VacPageList) state;
2538         OffsetNumber ioffno;
2539         OffsetNumber *voff;
2540         VacPage         vp,
2541                            *vpp;
2542         VacPageData vacpage;
2543
2544         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2545         ioffno = ItemPointerGetOffsetNumber(itemptr);
2546
2547         vp = &vacpage;
2548         vpp = (VacPage *) vac_bsearch((void *) &vp,
2549                                                                   (void *) (vacpagelist->pagedesc),
2550                                                                   vacpagelist->num_pages,
2551                                                                   sizeof(VacPage),
2552                                                                   vac_cmp_blk);
2553
2554         if (vpp == NULL)
2555                 return false;
2556
2557         /* ok - we are on a partially or fully reaped page */
2558         vp = *vpp;
2559
2560         if (vp->offsets_free == 0)
2561         {
2562                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2563                 return true;
2564         }
2565
2566         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2567                                                                                 (void *) (vp->offsets),
2568                                                                                 vp->offsets_free,
2569                                                                                 sizeof(OffsetNumber),
2570                                                                                 vac_cmp_offno);
2571
2572         if (voff == NULL)
2573                 return false;
2574
2575         /* tid is reaped */
2576         return true;
2577 }
2578
2579 /*
2580  * Dummy version for scan_index.
2581  */
2582 static bool
2583 dummy_tid_reaped(ItemPointer itemptr, void *state)
2584 {
2585         return false;
2586 }
2587
2588 /*
2589  * Update the shared Free Space Map with the info we now have about
2590  * free space in the relation, discarding any old info the map may have.
2591  */
2592 static void
2593 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2594                            BlockNumber rel_pages)
2595 {
2596         int                     nPages = fraged_pages->num_pages;
2597         int                     i;
2598         BlockNumber *pages;
2599         Size       *spaceAvail;
2600
2601         /* +1 to avoid palloc(0) */
2602         pages = (BlockNumber *) palloc((nPages + 1) * sizeof(BlockNumber));
2603         spaceAvail = (Size *) palloc((nPages + 1) * sizeof(Size));
2604
2605         for (i = 0; i < nPages; i++)
2606         {
2607                 pages[i] = fraged_pages->pagedesc[i]->blkno;
2608                 spaceAvail[i] = fraged_pages->pagedesc[i]->free;
2609
2610                 /*
2611                  * fraged_pages may contain entries for pages that we later
2612                  * decided to truncate from the relation; don't enter them into
2613                  * the map!
2614                  */
2615                 if (pages[i] >= rel_pages)
2616                 {
2617                         nPages = i;
2618                         break;
2619                 }
2620         }
2621
2622         MultiRecordFreeSpace(&onerel->rd_node,
2623                                                  0, MaxBlockNumber,
2624                                                  nPages, pages, spaceAvail);
2625         pfree(pages);
2626         pfree(spaceAvail);
2627 }
2628
2629 /* Copy a VacPage structure */
2630 static VacPage
2631 copy_vac_page(VacPage vacpage)
2632 {
2633         VacPage         newvacpage;
2634
2635         /* allocate a VacPageData entry */
2636         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2637                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2638
2639         /* fill it in */
2640         if (vacpage->offsets_free > 0)
2641                 memcpy(newvacpage->offsets, vacpage->offsets,
2642                            vacpage->offsets_free * sizeof(OffsetNumber));
2643         newvacpage->blkno = vacpage->blkno;
2644         newvacpage->free = vacpage->free;
2645         newvacpage->offsets_used = vacpage->offsets_used;
2646         newvacpage->offsets_free = vacpage->offsets_free;
2647
2648         return newvacpage;
2649 }
2650
2651 /*
2652  * Add a VacPage pointer to a VacPageList.
2653  *
2654  *              As a side effect of the way that scan_heap works,
2655  *              higher pages come after lower pages in the array
2656  *              (and highest tid on a page is last).
2657  */
2658 static void
2659 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2660 {
2661 #define PG_NPAGEDESC 1024
2662
2663         /* allocate a VacPage entry if needed */
2664         if (vacpagelist->num_pages == 0)
2665         {
2666                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2667                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2668         }
2669         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2670         {
2671                 vacpagelist->num_allocated_pages *= 2;
2672                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2673         }
2674         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2675         (vacpagelist->num_pages)++;
2676 }
2677
2678 /*
2679  * vac_bsearch: just like standard C library routine bsearch(),
2680  * except that we first test to see whether the target key is outside
2681  * the range of the table entries.      This case is handled relatively slowly
2682  * by the normal binary search algorithm (ie, no faster than any other key)
2683  * but it occurs often enough in VACUUM to be worth optimizing.
2684  */
2685 static void *
2686 vac_bsearch(const void *key, const void *base,
2687                         size_t nelem, size_t size,
2688                         int (*compar) (const void *, const void *))
2689 {
2690         int                     res;
2691         const void *last;
2692
2693         if (nelem == 0)
2694                 return NULL;
2695         res = compar(key, base);
2696         if (res < 0)
2697                 return NULL;
2698         if (res == 0)
2699                 return (void *) base;
2700         if (nelem > 1)
2701         {
2702                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2703                 res = compar(key, last);
2704                 if (res > 0)
2705                         return NULL;
2706                 if (res == 0)
2707                         return (void *) last;
2708         }
2709         if (nelem <= 2)
2710                 return NULL;                    /* already checked 'em all */
2711         return bsearch(key, base, nelem, size, compar);
2712 }
2713
2714 /*
2715  * Comparator routines for use with qsort() and bsearch().
2716  */
2717 static int
2718 vac_cmp_blk(const void *left, const void *right)
2719 {
2720         BlockNumber lblk,
2721                                 rblk;
2722
2723         lblk = (*((VacPage *) left))->blkno;
2724         rblk = (*((VacPage *) right))->blkno;
2725
2726         if (lblk < rblk)
2727                 return -1;
2728         if (lblk == rblk)
2729                 return 0;
2730         return 1;
2731 }
2732
2733 static int
2734 vac_cmp_offno(const void *left, const void *right)
2735 {
2736         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2737                 return -1;
2738         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2739                 return 0;
2740         return 1;
2741 }
2742
2743 static int
2744 vac_cmp_vtlinks(const void *left, const void *right)
2745 {
2746         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2747                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2748                 return -1;
2749         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2750                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2751                 return 1;
2752         /* bi_hi-es are equal */
2753         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2754                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2755                 return -1;
2756         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2757                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2758                 return 1;
2759         /* bi_lo-es are equal */
2760         if (((VTupleLink) left)->new_tid.ip_posid <
2761                 ((VTupleLink) right)->new_tid.ip_posid)
2762                 return -1;
2763         if (((VTupleLink) left)->new_tid.ip_posid >
2764                 ((VTupleLink) right)->new_tid.ip_posid)
2765                 return 1;
2766         return 0;
2767 }
2768
2769
2770 void
2771 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2772 {
2773         List       *indexoidlist,
2774                            *indexoidscan;
2775         int                     i;
2776
2777         indexoidlist = RelationGetIndexList(relation);
2778
2779         *nindexes = length(indexoidlist);
2780
2781         if (*nindexes > 0)
2782                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2783         else
2784                 *Irel = NULL;
2785
2786         i = 0;
2787         foreach(indexoidscan, indexoidlist)
2788         {
2789                 Oid                     indexoid = lfirsti(indexoidscan);
2790
2791                 (*Irel)[i] = index_open(indexoid);
2792                 i++;
2793         }
2794
2795         freeList(indexoidlist);
2796 }
2797
2798
2799 void
2800 vac_close_indexes(int nindexes, Relation *Irel)
2801 {
2802         if (Irel == (Relation *) NULL)
2803                 return;
2804
2805         while (nindexes--)
2806                 index_close(Irel[nindexes]);
2807         pfree(Irel);
2808 }
2809
2810
2811 /*
2812  * Is an index partial (ie, could it contain fewer tuples than the heap?)
2813  */
2814 bool
2815 vac_is_partial_index(Relation indrel)
2816 {
2817         /*
2818          * If the index's AM doesn't support nulls, it's partial for our
2819          * purposes
2820          */
2821         if (!indrel->rd_am->amindexnulls)
2822                 return true;
2823
2824         /* Otherwise, look to see if there's a partial-index predicate */
2825         return (VARSIZE(&indrel->rd_index->indpred) > VARHDRSZ);
2826 }
2827
2828
2829 static bool
2830 enough_space(VacPage vacpage, Size len)
2831 {
2832         len = MAXALIGN(len);
2833
2834         if (len > vacpage->free)
2835                 return false;
2836
2837         /* if there are free itemid(s) and len <= free_space... */
2838         if (vacpage->offsets_used < vacpage->offsets_free)
2839                 return true;
2840
2841         /* noff_used >= noff_free and so we'll have to allocate new itemid */
2842         if (len + sizeof(ItemIdData) <= vacpage->free)
2843                 return true;
2844
2845         return false;
2846 }
2847
2848
2849 /*
2850  * Initialize usage snapshot.
2851  */
2852 void
2853 vac_init_rusage(VacRUsage *ru0)
2854 {
2855         struct timezone tz;
2856
2857         getrusage(RUSAGE_SELF, &ru0->ru);
2858         gettimeofday(&ru0->tv, &tz);
2859 }
2860
2861 /*
2862  * Compute elapsed time since ru0 usage snapshot, and format into
2863  * a displayable string.  Result is in a static string, which is
2864  * tacky, but no one ever claimed that the Postgres backend is
2865  * threadable...
2866  */
2867 const char *
2868 vac_show_rusage(VacRUsage *ru0)
2869 {
2870         static char result[100];
2871         VacRUsage       ru1;
2872
2873         vac_init_rusage(&ru1);
2874
2875         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
2876         {
2877                 ru1.tv.tv_sec--;
2878                 ru1.tv.tv_usec += 1000000;
2879         }
2880         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
2881         {
2882                 ru1.ru.ru_stime.tv_sec--;
2883                 ru1.ru.ru_stime.tv_usec += 1000000;
2884         }
2885         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
2886         {
2887                 ru1.ru.ru_utime.tv_sec--;
2888                 ru1.ru.ru_utime.tv_usec += 1000000;
2889         }
2890
2891         snprintf(result, sizeof(result),
2892                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
2893                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
2894           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
2895                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
2896           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
2897                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
2898                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
2899
2900         return result;
2901 }