]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Invert logic in pg_exec_query_string() so that we set a snapshot for
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.242 2002/10/19 20:15:09 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_index.h"
33 #include "commands/vacuum.h"
34 #include "executor/executor.h"
35 #include "miscadmin.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "storage/smgr.h"
39 #include "tcop/pquery.h"
40 #include "utils/acl.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/inval.h"
44 #include "utils/lsyscache.h"
45 #include "utils/relcache.h"
46 #include "utils/syscache.h"
47 #include "pgstat.h"
48
49
50 typedef struct VacPageData
51 {
52         BlockNumber blkno;                      /* BlockNumber of this Page */
53         Size            free;                   /* FreeSpace on this Page */
54         uint16          offsets_used;   /* Number of OffNums used by vacuum */
55         uint16          offsets_free;   /* Number of OffNums free or to be free */
56         OffsetNumber offsets[1];        /* Array of free OffNums */
57 } VacPageData;
58
59 typedef VacPageData *VacPage;
60
61 typedef struct VacPageListData
62 {
63         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
64         int                     num_pages;              /* Number of pages in pagedesc */
65         int                     num_allocated_pages;    /* Number of allocated pages in
66                                                                                  * pagedesc */
67         VacPage    *pagedesc;           /* Descriptions of pages */
68 } VacPageListData;
69
70 typedef VacPageListData *VacPageList;
71
72 typedef struct VTupleLinkData
73 {
74         ItemPointerData new_tid;
75         ItemPointerData this_tid;
76 } VTupleLinkData;
77
78 typedef VTupleLinkData *VTupleLink;
79
80 typedef struct VTupleMoveData
81 {
82         ItemPointerData tid;            /* tuple ID */
83         VacPage         vacpage;                /* where to move */
84         bool            cleanVpd;               /* clean vacpage before using */
85 } VTupleMoveData;
86
87 typedef VTupleMoveData *VTupleMove;
88
89 typedef struct VRelStats
90 {
91         BlockNumber rel_pages;
92         double          rel_tuples;
93         Size            min_tlen;
94         Size            max_tlen;
95         bool            hasindex;
96         int                     num_vtlinks;
97         VTupleLink      vtlinks;
98 } VRelStats;
99
100
101 static MemoryContext vac_context = NULL;
102
103 static int      elevel = -1;
104
105 static TransactionId OldestXmin;
106 static TransactionId FreezeLimit;
107
108
109 /* non-export function prototypes */
110 static List *getrels(const RangeVar *vacrel, const char *stmttype);
111 static void vac_update_dbstats(Oid dbid,
112                                    TransactionId vacuumXID,
113                                    TransactionId frozenXID);
114 static void vac_truncate_clog(TransactionId vacuumXID,
115                                   TransactionId frozenXID);
116 static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
117 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
118 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
119                   VacPageList vacuum_pages, VacPageList fraged_pages);
120 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
121                         VacPageList vacuum_pages, VacPageList fraged_pages,
122                         int nindexes, Relation *Irel);
123 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
124                         VacPageList vacpagelist);
125 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
126 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
127                          double num_tuples, int keep_tuples);
128 static void scan_index(Relation indrel, double num_tuples);
129 static bool tid_reaped(ItemPointer itemptr, void *state);
130 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
131 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
132                            BlockNumber rel_pages);
133 static VacPage copy_vac_page(VacPage vacpage);
134 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
135 static void *vac_bsearch(const void *key, const void *base,
136                         size_t nelem, size_t size,
137                         int (*compar) (const void *, const void *));
138 static int      vac_cmp_blk(const void *left, const void *right);
139 static int      vac_cmp_offno(const void *left, const void *right);
140 static int      vac_cmp_vtlinks(const void *left, const void *right);
141 static bool enough_space(VacPage vacpage, Size len);
142
143
144 /****************************************************************************
145  *                                                                                                                                                      *
146  *                      Code common to all flavors of VACUUM and ANALYZE                                *
147  *                                                                                                                                                      *
148  ****************************************************************************
149  */
150
151
152 /*
153  * Primary entry point for VACUUM and ANALYZE commands.
154  */
155 void
156 vacuum(VacuumStmt *vacstmt)
157 {
158         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
159         MemoryContext anl_context = NULL;
160         TransactionId initialOldestXmin = InvalidTransactionId;
161         TransactionId initialFreezeLimit = InvalidTransactionId;
162         bool            all_rels;
163         List       *vrl,
164                            *cur;
165
166         if (vacstmt->verbose)
167                 elevel = INFO;
168         else
169                 elevel = DEBUG1;
170
171         /*
172          * We cannot run VACUUM inside a user transaction block; if we were
173          * inside a transaction, then our commit- and
174          * start-transaction-command calls would not have the intended effect!
175          * Furthermore, the forced commit that occurs before truncating the
176          * relation's file would have the effect of committing the rest of the
177          * user's transaction too, which would certainly not be the desired
178          * behavior.
179          */
180         if (vacstmt->vacuum && IsTransactionBlock())
181                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
182
183         /* Running VACUUM from a function would free the function context */
184         if (vacstmt->vacuum && !MemoryContextContains(QueryContext, vacstmt))
185                 elog(ERROR, "%s cannot be executed from a function", stmttype);
186
187         /*
188          * Send info about dead objects to the statistics collector
189          */
190         if (vacstmt->vacuum)
191                 pgstat_vacuum_tabstat();
192
193         /*
194          * Create special memory context for cross-transaction storage.
195          *
196          * Since it is a child of QueryContext, it will go away eventually even
197          * if we suffer an error; there's no need for special abort cleanup
198          * logic.
199          */
200         vac_context = AllocSetContextCreate(QueryContext,
201                                                                                 "Vacuum",
202                                                                                 ALLOCSET_DEFAULT_MINSIZE,
203                                                                                 ALLOCSET_DEFAULT_INITSIZE,
204                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
205
206         /*
207          * If we are running only ANALYZE, we don't need per-table
208          * transactions, but we still need a memory context with table
209          * lifetime.
210          */
211         if (vacstmt->analyze && !vacstmt->vacuum)
212                 anl_context = AllocSetContextCreate(QueryContext,
213                                                                                         "Analyze",
214                                                                                         ALLOCSET_DEFAULT_MINSIZE,
215                                                                                         ALLOCSET_DEFAULT_INITSIZE,
216                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
217
218         /* Assume we are processing everything unless one table is mentioned */
219         all_rels = (vacstmt->relation == NULL);
220
221         /* Build list of relations to process (note this lives in vac_context) */
222         vrl = getrels(vacstmt->relation, stmttype);
223
224         /*
225          * Formerly, there was code here to prevent more than one VACUUM from
226          * executing concurrently in the same database.  However, there's no
227          * good reason to prevent that, and manually removing lockfiles after
228          * a vacuum crash was a pain for dbadmins.      So, forget about
229          * lockfiles, and just rely on the locks we grab on each target table
230          * to ensure that there aren't two VACUUMs running on the same table
231          * at the same time.
232          */
233
234         /*
235          * The strangeness with committing and starting transactions here is
236          * due to wanting to run each table's VACUUM as a separate
237          * transaction, so that we don't hold locks unnecessarily long.  Also,
238          * if we are doing VACUUM ANALYZE, the ANALYZE part runs as a separate
239          * transaction from the VACUUM to further reduce locking.
240          *
241          * vacuum_rel expects to be entered with no transaction active; it will
242          * start and commit its own transaction.  But we are called by an SQL
243          * command, and so we are executing inside a transaction already.  We
244          * commit the transaction started in PostgresMain() here, and start
245          * another one before exiting to match the commit waiting for us back
246          * in PostgresMain().
247          *
248          * In the case of an ANALYZE statement (no vacuum, just analyze) it's
249          * okay to run the whole thing in the outer transaction, and so we
250          * skip transaction start/stop operations.
251          */
252         if (vacstmt->vacuum)
253         {
254                 if (all_rels)
255                 {
256                         /*
257                          * It's a database-wide VACUUM.
258                          *
259                          * Compute the initially applicable OldestXmin and FreezeLimit
260                          * XIDs, so that we can record these values at the end of the
261                          * VACUUM. Note that individual tables may well be processed
262                          * with newer values, but we can guarantee that no
263                          * (non-shared) relations are processed with older ones.
264                          *
265                          * It is okay to record non-shared values in pg_database, even
266                          * though we may vacuum shared relations with older cutoffs,
267                          * because only the minimum of the values present in
268                          * pg_database matters.  We can be sure that shared relations
269                          * have at some time been vacuumed with cutoffs no worse than
270                          * the global minimum; for, if there is a backend in some
271                          * other DB with xmin = OLDXMIN that's determining the cutoff
272                          * with which we vacuum shared relations, it is not possible
273                          * for that database to have a cutoff newer than OLDXMIN
274                          * recorded in pg_database.
275                          */
276                         vacuum_set_xid_limits(vacstmt, false,
277                                                                   &initialOldestXmin,
278                                                                   &initialFreezeLimit);
279                 }
280
281                 /* matches the StartTransaction in PostgresMain() */
282                 CommitTransactionCommand(true);
283         }
284
285         /*
286          * Loop to process each selected relation.
287          */
288         foreach(cur, vrl)
289         {
290                 Oid                     relid = (Oid) lfirsti(cur);
291
292                 if (vacstmt->vacuum)
293                 {
294                         if (! vacuum_rel(relid, vacstmt, RELKIND_RELATION))
295                                 all_rels = false; /* forget about updating dbstats */
296                 }
297                 if (vacstmt->analyze)
298                 {
299                         MemoryContext old_context = NULL;
300
301                         /*
302                          * If we vacuumed, use new transaction for analyze.
303                          * Otherwise, we can use the outer transaction, but we still
304                          * need to call analyze_rel in a memory context that will be
305                          * cleaned up on return (else we leak memory while processing
306                          * multiple tables).
307                          */
308                         if (vacstmt->vacuum)
309                         {
310                                 StartTransactionCommand(true);
311                                 SetQuerySnapshot();     /* might be needed for functional index */
312                         }
313                         else
314                                 old_context = MemoryContextSwitchTo(anl_context);
315
316                         analyze_rel(relid, vacstmt);
317
318                         if (vacstmt->vacuum)
319                                 CommitTransactionCommand(true);
320                         else
321                         {
322                                 MemoryContextSwitchTo(old_context);
323                                 MemoryContextResetAndDeleteChildren(anl_context);
324                         }
325                 }
326         }
327
328         /*
329          * Finish up processing.
330          */
331         if (vacstmt->vacuum)
332         {
333                 /* here, we are not in a transaction */
334
335                 /*
336                  * This matches the CommitTransaction waiting for us in
337                  * PostgresMain(). We tell xact.c not to chain the upcoming
338                  * commit, so that a VACUUM doesn't start a transaction block,
339                  * even when autocommit is off.
340                  */
341                 StartTransactionCommand(true);
342
343                 /*
344                  * If we completed a database-wide VACUUM without skipping any
345                  * relations, update the database's pg_database row with info
346                  * about the transaction IDs used, and try to truncate pg_clog.
347                  */
348                 if (all_rels)
349                 {
350                         vac_update_dbstats(MyDatabaseId,
351                                                            initialOldestXmin, initialFreezeLimit);
352                         vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
353                 }
354         }
355
356         /*
357          * Clean up working storage --- note we must do this after
358          * StartTransactionCommand, else we might be trying to delete the
359          * active context!
360          */
361         MemoryContextDelete(vac_context);
362         vac_context = NULL;
363
364         if (anl_context)
365                 MemoryContextDelete(anl_context);
366 }
367
368 /*
369  * Build a list of Oids for each relation to be processed
370  *
371  * The list is built in vac_context so that it will survive across our
372  * per-relation transactions.
373  */
374 static List *
375 getrels(const RangeVar *vacrel, const char *stmttype)
376 {
377         List       *vrl = NIL;
378         MemoryContext oldcontext;
379
380         if (vacrel)
381         {
382                 /* Process specific relation */
383                 Oid                     relid;
384
385                 relid = RangeVarGetRelid(vacrel, false);
386
387                 /* Make a relation list entry for this guy */
388                 oldcontext = MemoryContextSwitchTo(vac_context);
389                 vrl = lappendi(vrl, relid);
390                 MemoryContextSwitchTo(oldcontext);
391         }
392         else
393         {
394                 /* Process all plain relations listed in pg_class */
395                 Relation        pgclass;
396                 HeapScanDesc scan;
397                 HeapTuple       tuple;
398                 ScanKeyData key;
399
400                 ScanKeyEntryInitialize(&key, 0x0,
401                                                            Anum_pg_class_relkind,
402                                                            F_CHAREQ,
403                                                            CharGetDatum(RELKIND_RELATION));
404
405                 pgclass = heap_openr(RelationRelationName, AccessShareLock);
406
407                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
408
409                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
410                 {
411                         /* Make a relation list entry for this guy */
412                         oldcontext = MemoryContextSwitchTo(vac_context);
413                         vrl = lappendi(vrl, HeapTupleGetOid(tuple));
414                         MemoryContextSwitchTo(oldcontext);
415                 }
416
417                 heap_endscan(scan);
418                 heap_close(pgclass, AccessShareLock);
419         }
420
421         return vrl;
422 }
423
424 /*
425  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
426  */
427 void
428 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
429                                           TransactionId *oldestXmin,
430                                           TransactionId *freezeLimit)
431 {
432         TransactionId limit;
433
434         *oldestXmin = GetOldestXmin(sharedRel);
435
436         Assert(TransactionIdIsNormal(*oldestXmin));
437
438         if (vacstmt->freeze)
439         {
440                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
441                 limit = *oldestXmin;
442         }
443         else
444         {
445                 /*
446                  * Normal case: freeze cutoff is well in the past, to wit, about
447                  * halfway to the wrap horizon
448                  */
449                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
450         }
451
452         /*
453          * Be careful not to generate a "permanent" XID
454          */
455         if (!TransactionIdIsNormal(limit))
456                 limit = FirstNormalTransactionId;
457
458         /*
459          * Ensure sane relationship of limits
460          */
461         if (TransactionIdFollows(limit, *oldestXmin))
462         {
463                 elog(WARNING, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
464                 limit = *oldestXmin;
465         }
466
467         *freezeLimit = limit;
468 }
469
470
471 /*
472  *      vac_update_relstats() -- update statistics for one relation
473  *
474  *              Update the whole-relation statistics that are kept in its pg_class
475  *              row.  There are additional stats that will be updated if we are
476  *              doing ANALYZE, but we always update these stats.  This routine works
477  *              for both index and heap relation entries in pg_class.
478  *
479  *              We violate no-overwrite semantics here by storing new values for the
480  *              statistics columns directly into the pg_class tuple that's already on
481  *              the page.  The reason for this is that if we updated these tuples in
482  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
483  *              by the time we got done with a vacuum cycle, most of the tuples in
484  *              pg_class would've been obsoleted.  Of course, this only works for
485  *              fixed-size never-null columns, but these are.
486  *
487  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
488  *              ANALYZE.
489  */
490 void
491 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
492                                         bool hasindex)
493 {
494         Relation        rd;
495         HeapTupleData rtup;
496         HeapTuple       ctup;
497         Form_pg_class pgcform;
498         Buffer          buffer;
499
500         /*
501          * update number of tuples and number of pages in pg_class
502          */
503         rd = heap_openr(RelationRelationName, RowExclusiveLock);
504
505         ctup = SearchSysCache(RELOID,
506                                                   ObjectIdGetDatum(relid),
507                                                   0, 0, 0);
508         if (!HeapTupleIsValid(ctup))
509                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
510                          relid);
511
512         /* get the buffer cache tuple */
513         rtup.t_self = ctup->t_self;
514         ReleaseSysCache(ctup);
515         if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
516                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
517                          relid);
518
519         /* overwrite the existing statistics in the tuple */
520         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
521         pgcform->relpages = (int32) num_pages;
522         pgcform->reltuples = num_tuples;
523         pgcform->relhasindex = hasindex;
524
525         /*
526          * If we have discovered that there are no indexes, then there's no
527          * primary key either.  This could be done more thoroughly...
528          */
529         if (!hasindex)
530                 pgcform->relhaspkey = false;
531
532         /*
533          * Invalidate the tuple in the catcaches; this also arranges to flush
534          * the relation's relcache entry.  (If we fail to commit for some
535          * reason, no flush will occur, but no great harm is done since there
536          * are no noncritical state updates here.)
537          */
538         CacheInvalidateHeapTuple(rd, &rtup);
539
540         /* Write the buffer */
541         WriteBuffer(buffer);
542
543         heap_close(rd, RowExclusiveLock);
544 }
545
546
547 /*
548  *      vac_update_dbstats() -- update statistics for one database
549  *
550  *              Update the whole-database statistics that are kept in its pg_database
551  *              row.
552  *
553  *              We violate no-overwrite semantics here by storing new values for the
554  *              statistics columns directly into the tuple that's already on the page.
555  *              As with vac_update_relstats, this avoids leaving dead tuples behind
556  *              after a VACUUM; which is good since GetRawDatabaseInfo
557  *              can get confused by finding dead tuples in pg_database.
558  *
559  *              This routine is shared by full and lazy VACUUM.  Note that it is only
560  *              applied after a database-wide VACUUM operation.
561  */
562 static void
563 vac_update_dbstats(Oid dbid,
564                                    TransactionId vacuumXID,
565                                    TransactionId frozenXID)
566 {
567         Relation        relation;
568         ScanKeyData entry[1];
569         HeapScanDesc scan;
570         HeapTuple       tuple;
571         Form_pg_database dbform;
572
573         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
574
575         /* Must use a heap scan, since there's no syscache for pg_database */
576         ScanKeyEntryInitialize(&entry[0], 0x0,
577                                                    ObjectIdAttributeNumber, F_OIDEQ,
578                                                    ObjectIdGetDatum(dbid));
579
580         scan = heap_beginscan(relation, SnapshotNow, 1, entry);
581
582         tuple = heap_getnext(scan, ForwardScanDirection);
583
584         if (!HeapTupleIsValid(tuple))
585                 elog(ERROR, "database %u does not exist", dbid);
586
587         dbform = (Form_pg_database) GETSTRUCT(tuple);
588
589         /* overwrite the existing statistics in the tuple */
590         dbform->datvacuumxid = vacuumXID;
591         dbform->datfrozenxid = frozenXID;
592
593         /* invalidate the tuple in the cache and write the buffer */
594         CacheInvalidateHeapTuple(relation, tuple);
595         WriteNoReleaseBuffer(scan->rs_cbuf);
596
597         heap_endscan(scan);
598
599         heap_close(relation, RowExclusiveLock);
600 }
601
602
603 /*
604  *      vac_truncate_clog() -- attempt to truncate the commit log
605  *
606  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
607  *              and use it to truncate the transaction commit log (pg_clog).
608  *              Also generate a warning if the system-wide oldest datfrozenxid
609  *              seems to be in danger of wrapping around.
610  *
611  *              The passed XIDs are simply the ones I just wrote into my pg_database
612  *              entry.  They're used to initialize the "min" calculations.
613  *
614  *              This routine is shared by full and lazy VACUUM.  Note that it is only
615  *              applied after a database-wide VACUUM operation.
616  */
617 static void
618 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
619 {
620         TransactionId myXID;
621         Relation        relation;
622         HeapScanDesc scan;
623         HeapTuple       tuple;
624         int32           age;
625         bool            vacuumAlreadyWrapped = false;
626         bool            frozenAlreadyWrapped = false;
627
628         myXID = GetCurrentTransactionId();
629
630         relation = heap_openr(DatabaseRelationName, AccessShareLock);
631
632         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
633
634         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
635         {
636                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
637
638                 /* Ignore non-connectable databases (eg, template0) */
639                 /* It's assumed that these have been frozen correctly */
640                 if (!dbform->datallowconn)
641                         continue;
642
643                 if (TransactionIdIsNormal(dbform->datvacuumxid))
644                 {
645                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
646                                 vacuumAlreadyWrapped = true;
647                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
648                                 vacuumXID = dbform->datvacuumxid;
649                 }
650                 if (TransactionIdIsNormal(dbform->datfrozenxid))
651                 {
652                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
653                                 frozenAlreadyWrapped = true;
654                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
655                                 frozenXID = dbform->datfrozenxid;
656                 }
657         }
658
659         heap_endscan(scan);
660
661         heap_close(relation, AccessShareLock);
662
663         /*
664          * Do not truncate CLOG if we seem to have suffered wraparound
665          * already; the computed minimum XID might be bogus.
666          */
667         if (vacuumAlreadyWrapped)
668         {
669                 elog(WARNING, "Some databases have not been vacuumed in over 2 billion transactions."
670                          "\n\tYou may have already suffered transaction-wraparound data loss.");
671                 return;
672         }
673
674         /* Truncate CLOG to the oldest vacuumxid */
675         TruncateCLOG(vacuumXID);
676
677         /* Give warning about impending wraparound problems */
678         if (frozenAlreadyWrapped)
679         {
680                 elog(WARNING, "Some databases have not been vacuumed in over 1 billion transactions."
681                          "\n\tBetter vacuum them soon, or you may have a wraparound failure.");
682         }
683         else
684         {
685                 age = (int32) (myXID - frozenXID);
686                 if (age > (int32) ((MaxTransactionId >> 3) * 3))
687                         elog(WARNING, "Some databases have not been vacuumed in %d transactions."
688                                  "\n\tBetter vacuum them within %d transactions,"
689                                  "\n\tor you may have a wraparound failure.",
690                                  age, (int32) (MaxTransactionId >> 1) - age);
691         }
692 }
693
694
695 /****************************************************************************
696  *                                                                                                                                                      *
697  *                      Code common to both flavors of VACUUM                                                   *
698  *                                                                                                                                                      *
699  ****************************************************************************
700  */
701
702
703 /*
704  *      vacuum_rel() -- vacuum one heap relation
705  *
706  *              Returns TRUE if we actually processed the relation (or can ignore it
707  *              for some reason), FALSE if we failed to process it due to permissions
708  *              or other reasons.  (A FALSE result really means that some data
709  *              may have been left unvacuumed, so we can't update XID stats.)
710  *
711  *              Doing one heap at a time incurs extra overhead, since we need to
712  *              check that the heap exists again just before we vacuum it.      The
713  *              reason that we do this is so that vacuuming can be spread across
714  *              many small transactions.  Otherwise, two-phase locking would require
715  *              us to lock the entire database during one pass of the vacuum cleaner.
716  *
717  *              At entry and exit, we are not inside a transaction.
718  */
719 static bool
720 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
721 {
722         LOCKMODE        lmode;
723         Relation        onerel;
724         LockRelId       onerelid;
725         Oid                     toast_relid;
726         bool            result;
727
728         /* Begin a transaction for vacuuming this relation */
729         StartTransactionCommand(true);
730         SetQuerySnapshot();                     /* might be needed for functional index */
731
732         /*
733          * Check for user-requested abort.      Note we want this to be inside a
734          * transaction, so xact.c doesn't issue useless WARNING.
735          */
736         CHECK_FOR_INTERRUPTS();
737
738         /*
739          * Race condition -- if the pg_class tuple has gone away since the
740          * last time we saw it, we don't need to vacuum it.
741          */
742         if (!SearchSysCacheExists(RELOID,
743                                                           ObjectIdGetDatum(relid),
744                                                           0, 0, 0))
745         {
746                 CommitTransactionCommand(true);
747                 return true;                    /* okay 'cause no data there */
748         }
749
750         /*
751          * Determine the type of lock we want --- hard exclusive lock for a
752          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
753          * vacuum.      Either way, we can be sure that no other backend is
754          * vacuuming the same table.
755          */
756         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
757
758         /*
759          * Open the class, get an appropriate lock on it, and check
760          * permissions.
761          *
762          * We allow the user to vacuum a table if he is superuser, the table
763          * owner, or the database owner (but in the latter case, only if it's
764          * not a shared relation).      pg_class_ownercheck includes the superuser
765          * case.
766          *
767          * Note we choose to treat permissions failure as a WARNING and keep
768          * trying to vacuum the rest of the DB --- is this appropriate?
769          */
770         onerel = relation_open(relid, lmode);
771
772         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
773                   (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
774         {
775                 elog(WARNING, "Skipping \"%s\" --- only table or database owner can VACUUM it",
776                          RelationGetRelationName(onerel));
777                 relation_close(onerel, lmode);
778                 CommitTransactionCommand(true);
779                 return false;
780         }
781
782         /*
783          * Check that it's a plain table; we used to do this in getrels() but
784          * seems safer to check after we've locked the relation.
785          */
786         if (onerel->rd_rel->relkind != expected_relkind)
787         {
788                 elog(WARNING, "Skipping \"%s\" --- can not process indexes, views or special system tables",
789                          RelationGetRelationName(onerel));
790                 relation_close(onerel, lmode);
791                 CommitTransactionCommand(true);
792                 return false;
793         }
794
795         /*
796          * Silently ignore tables that are temp tables of other backends ---
797          * trying to vacuum these will lead to great unhappiness, since their
798          * contents are probably not up-to-date on disk.  (We don't throw a
799          * warning here; it would just lead to chatter during a database-wide
800          * VACUUM.)
801          */
802         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
803         {
804                 relation_close(onerel, lmode);
805                 CommitTransactionCommand(true);
806                 return true;                    /* assume no long-lived data in temp tables */
807         }
808
809         /*
810          * Get a session-level lock too. This will protect our access to the
811          * relation across multiple transactions, so that we can vacuum the
812          * relation's TOAST table (if any) secure in the knowledge that no one
813          * is deleting the parent relation.
814          *
815          * NOTE: this cannot block, even if someone else is waiting for access,
816          * because the lock manager knows that both lock requests are from the
817          * same process.
818          */
819         onerelid = onerel->rd_lockInfo.lockRelId;
820         LockRelationForSession(&onerelid, lmode);
821
822         /*
823          * Remember the relation's TOAST relation for later
824          */
825         toast_relid = onerel->rd_rel->reltoastrelid;
826
827         /*
828          * Do the actual work --- either FULL or "lazy" vacuum
829          */
830         if (vacstmt->full)
831                 full_vacuum_rel(onerel, vacstmt);
832         else
833                 lazy_vacuum_rel(onerel, vacstmt);
834
835         result = true;                          /* did the vacuum */
836
837         /* all done with this class, but hold lock until commit */
838         relation_close(onerel, NoLock);
839
840         /*
841          * Complete the transaction and free all temporary memory used.
842          */
843         CommitTransactionCommand(true);
844
845         /*
846          * If the relation has a secondary toast rel, vacuum that too while we
847          * still hold the session lock on the master table.  Note however that
848          * "analyze" will not get done on the toast table.      This is good,
849          * because the toaster always uses hardcoded index access and
850          * statistics are totally unimportant for toast relations.
851          */
852         if (toast_relid != InvalidOid)
853         {
854                 if (! vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
855                         result = false;         /* failed to vacuum the TOAST table? */
856         }
857
858         /*
859          * Now release the session-level lock on the master table.
860          */
861         UnlockRelationForSession(&onerelid, lmode);
862
863         return result;
864 }
865
866
867 /****************************************************************************
868  *                                                                                                                                                      *
869  *                      Code for VACUUM FULL (only)                                                                             *
870  *                                                                                                                                                      *
871  ****************************************************************************
872  */
873
874
875 /*
876  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
877  *
878  *              This routine vacuums a single heap, cleans out its indexes, and
879  *              updates its num_pages and num_tuples statistics.
880  *
881  *              At entry, we have already established a transaction and opened
882  *              and locked the relation.
883  */
884 static void
885 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
886 {
887         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
888                                                                                  * clean indexes */
889         VacPageListData fraged_pages;           /* List of pages with space enough
890                                                                                  * for re-using */
891         Relation   *Irel;
892         int                     nindexes,
893                                 i;
894         VRelStats  *vacrelstats;
895         bool            reindex = false;
896
897         if (IsIgnoringSystemIndexes() &&
898                 IsSystemRelation(onerel))
899                 reindex = true;
900
901         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
902                                                   &OldestXmin, &FreezeLimit);
903
904         /*
905          * Set up statistics-gathering machinery.
906          */
907         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
908         vacrelstats->rel_pages = 0;
909         vacrelstats->rel_tuples = 0;
910         vacrelstats->hasindex = false;
911
912         /* scan the heap */
913         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
914         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
915
916         /* Now open all indexes of the relation */
917         vac_open_indexes(onerel, &nindexes, &Irel);
918         if (!Irel)
919                 reindex = false;
920         else if (!RelationGetForm(onerel)->relhasindex)
921                 reindex = true;
922         if (nindexes > 0)
923                 vacrelstats->hasindex = true;
924
925 #ifdef NOT_USED
926
927         /*
928          * reindex in VACUUM is dangerous under WAL. ifdef out until it
929          * becomes safe.
930          */
931         if (reindex)
932         {
933                 vac_close_indexes(nindexes, Irel);
934                 Irel = (Relation *) NULL;
935                 activate_indexes_of_a_table(onerel, false);
936         }
937 #endif   /* NOT_USED */
938
939         /* Clean/scan index relation(s) */
940         if (Irel != (Relation *) NULL)
941         {
942                 if (vacuum_pages.num_pages > 0)
943                 {
944                         for (i = 0; i < nindexes; i++)
945                                 vacuum_index(&vacuum_pages, Irel[i],
946                                                          vacrelstats->rel_tuples, 0);
947                 }
948                 else
949                 {
950                         /* just scan indexes to update statistic */
951                         for (i = 0; i < nindexes; i++)
952                                 scan_index(Irel[i], vacrelstats->rel_tuples);
953                 }
954         }
955
956         if (fraged_pages.num_pages > 0)
957         {
958                 /* Try to shrink heap */
959                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
960                                         nindexes, Irel);
961                 vac_close_indexes(nindexes, Irel);
962         }
963         else
964         {
965                 vac_close_indexes(nindexes, Irel);
966                 if (vacuum_pages.num_pages > 0)
967                 {
968                         /* Clean pages from vacuum_pages list */
969                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
970                 }
971                 else
972                 {
973                         /*
974                          * Flush dirty pages out to disk.  We must do this even if we
975                          * didn't do anything else, because we want to ensure that all
976                          * tuples have correct on-row commit status on disk (see
977                          * bufmgr.c's comments for FlushRelationBuffers()).
978                          */
979                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
980                         if (i < 0)
981                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
982                                          i);
983                 }
984         }
985
986 #ifdef NOT_USED
987         if (reindex)
988                 activate_indexes_of_a_table(onerel, true);
989 #endif   /* NOT_USED */
990
991         /* update shared free space map with final free space info */
992         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
993
994         /* update statistics in pg_class */
995         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
996                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
997 }
998
999
1000 /*
1001  *      scan_heap() -- scan an open heap relation
1002  *
1003  *              This routine sets commit status bits, constructs vacuum_pages (list
1004  *              of pages we need to compact free space on and/or clean indexes of
1005  *              deleted tuples), constructs fraged_pages (list of pages with free
1006  *              space that tuples could be moved into), and calculates statistics
1007  *              on the number of live tuples in the heap.
1008  */
1009 static void
1010 scan_heap(VRelStats *vacrelstats, Relation onerel,
1011                   VacPageList vacuum_pages, VacPageList fraged_pages)
1012 {
1013         BlockNumber nblocks,
1014                                 blkno;
1015         ItemId          itemid;
1016         Buffer          buf;
1017         HeapTupleData tuple;
1018         OffsetNumber offnum,
1019                                 maxoff;
1020         bool            pgchanged,
1021                                 tupgone,
1022                                 notup;
1023         char       *relname;
1024         VacPage         vacpage,
1025                                 vacpagecopy;
1026         BlockNumber empty_pages,
1027                                 new_pages,
1028                                 changed_pages,
1029                                 empty_end_pages;
1030         double          num_tuples,
1031                                 tups_vacuumed,
1032                                 nkeep,
1033                                 nunused;
1034         double          free_size,
1035                                 usable_free_size;
1036         Size            min_tlen = MaxTupleSize;
1037         Size            max_tlen = 0;
1038         int                     i;
1039         bool            do_shrinking = true;
1040         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1041         int                     num_vtlinks = 0;
1042         int                     free_vtlinks = 100;
1043         VacRUsage       ru0;
1044
1045         vac_init_rusage(&ru0);
1046
1047         relname = RelationGetRelationName(onerel);
1048         elog(elevel, "--Relation %s.%s--",
1049                  get_namespace_name(RelationGetNamespace(onerel)),
1050                  relname);
1051
1052         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
1053         num_tuples = tups_vacuumed = nkeep = nunused = 0;
1054         free_size = 0;
1055
1056         nblocks = RelationGetNumberOfBlocks(onerel);
1057
1058         /*
1059          * We initially create each VacPage item in a maximal-sized workspace,
1060          * then copy the workspace into a just-large-enough copy.
1061          */
1062         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1063
1064         for (blkno = 0; blkno < nblocks; blkno++)
1065         {
1066                 Page            page,
1067                                         tempPage = NULL;
1068                 bool            do_reap,
1069                                         do_frag;
1070
1071                 CHECK_FOR_INTERRUPTS();
1072
1073                 buf = ReadBuffer(onerel, blkno);
1074                 page = BufferGetPage(buf);
1075
1076                 vacpage->blkno = blkno;
1077                 vacpage->offsets_used = 0;
1078                 vacpage->offsets_free = 0;
1079
1080                 if (PageIsNew(page))
1081                 {
1082                         elog(WARNING, "Rel %s: Uninitialized page %u - fixing",
1083                                  relname, blkno);
1084                         PageInit(page, BufferGetPageSize(buf), 0);
1085                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1086                         free_size += (vacpage->free - sizeof(ItemIdData));
1087                         new_pages++;
1088                         empty_end_pages++;
1089                         vacpagecopy = copy_vac_page(vacpage);
1090                         vpage_insert(vacuum_pages, vacpagecopy);
1091                         vpage_insert(fraged_pages, vacpagecopy);
1092                         WriteBuffer(buf);
1093                         continue;
1094                 }
1095
1096                 if (PageIsEmpty(page))
1097                 {
1098                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1099                         free_size += (vacpage->free - sizeof(ItemIdData));
1100                         empty_pages++;
1101                         empty_end_pages++;
1102                         vacpagecopy = copy_vac_page(vacpage);
1103                         vpage_insert(vacuum_pages, vacpagecopy);
1104                         vpage_insert(fraged_pages, vacpagecopy);
1105                         ReleaseBuffer(buf);
1106                         continue;
1107                 }
1108
1109                 pgchanged = false;
1110                 notup = true;
1111                 maxoff = PageGetMaxOffsetNumber(page);
1112                 for (offnum = FirstOffsetNumber;
1113                          offnum <= maxoff;
1114                          offnum = OffsetNumberNext(offnum))
1115                 {
1116                         uint16          sv_infomask;
1117
1118                         itemid = PageGetItemId(page, offnum);
1119
1120                         /*
1121                          * Collect un-used items too - it's possible to have indexes
1122                          * pointing here after crash.
1123                          */
1124                         if (!ItemIdIsUsed(itemid))
1125                         {
1126                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1127                                 nunused += 1;
1128                                 continue;
1129                         }
1130
1131                         tuple.t_datamcxt = NULL;
1132                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1133                         tuple.t_len = ItemIdGetLength(itemid);
1134                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1135
1136                         tupgone = false;
1137                         sv_infomask = tuple.t_data->t_infomask;
1138
1139                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1140                         {
1141                                 case HEAPTUPLE_DEAD:
1142                                         tupgone = true;         /* we can delete the tuple */
1143                                         break;
1144                                 case HEAPTUPLE_LIVE:
1145
1146                                         /*
1147                                          * Tuple is good.  Consider whether to replace its
1148                                          * xmin value with FrozenTransactionId.
1149                                          */
1150                                         if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
1151                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1152                                                                                           FreezeLimit))
1153                                         {
1154                                                 HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
1155                                                 /* infomask should be okay already */
1156                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1157                                                 pgchanged = true;
1158                                         }
1159                                         break;
1160                                 case HEAPTUPLE_RECENTLY_DEAD:
1161
1162                                         /*
1163                                          * If tuple is recently deleted then we must not
1164                                          * remove it from relation.
1165                                          */
1166                                         nkeep += 1;
1167
1168                                         /*
1169                                          * If we do shrinking and this tuple is updated one
1170                                          * then remember it to construct updated tuple
1171                                          * dependencies.
1172                                          */
1173                                         if (do_shrinking &&
1174                                                 !(ItemPointerEquals(&(tuple.t_self),
1175                                                                                         &(tuple.t_data->t_ctid))))
1176                                         {
1177                                                 if (free_vtlinks == 0)
1178                                                 {
1179                                                         free_vtlinks = 1000;
1180                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1181                                                                                    (free_vtlinks + num_vtlinks) *
1182                                                                                                  sizeof(VTupleLinkData));
1183                                                 }
1184                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1185                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1186                                                 free_vtlinks--;
1187                                                 num_vtlinks++;
1188                                         }
1189                                         break;
1190                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1191
1192                                         /*
1193                                          * This should not happen, since we hold exclusive
1194                                          * lock on the relation; shouldn't we raise an error?
1195                                          */
1196                                         elog(WARNING, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
1197                                                  relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data));
1198                                         do_shrinking = false;
1199                                         break;
1200                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1201
1202                                         /*
1203                                          * This should not happen, since we hold exclusive
1204                                          * lock on the relation; shouldn't we raise an error?
1205                                          */
1206                                         elog(WARNING, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
1207                                                  relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data));
1208                                         do_shrinking = false;
1209                                         break;
1210                                 default:
1211                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
1212                                         break;
1213                         }
1214
1215                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1216                         if (sv_infomask != tuple.t_data->t_infomask)
1217                                 pgchanged = true;
1218
1219                         /*
1220                          * Other checks...
1221                          */
1222                         if (onerel->rd_rel->relhasoids &&
1223                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1224                                 elog(WARNING, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
1225                                          relname, blkno, offnum, (int) tupgone);
1226
1227                         if (tupgone)
1228                         {
1229                                 ItemId          lpp;
1230
1231                                 /*
1232                                  * Here we are building a temporary copy of the page with
1233                                  * dead tuples removed.  Below we will apply
1234                                  * PageRepairFragmentation to the copy, so that we can
1235                                  * determine how much space will be available after
1236                                  * removal of dead tuples.      But note we are NOT changing
1237                                  * the real page yet...
1238                                  */
1239                                 if (tempPage == (Page) NULL)
1240                                 {
1241                                         Size            pageSize;
1242
1243                                         pageSize = PageGetPageSize(page);
1244                                         tempPage = (Page) palloc(pageSize);
1245                                         memcpy(tempPage, page, pageSize);
1246                                 }
1247
1248                                 /* mark it unused on the temp page */
1249                                 lpp = PageGetItemId(tempPage, offnum);
1250                                 lpp->lp_flags &= ~LP_USED;
1251
1252                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1253                                 tups_vacuumed += 1;
1254                         }
1255                         else
1256                         {
1257                                 num_tuples += 1;
1258                                 notup = false;
1259                                 if (tuple.t_len < min_tlen)
1260                                         min_tlen = tuple.t_len;
1261                                 if (tuple.t_len > max_tlen)
1262                                         max_tlen = tuple.t_len;
1263                         }
1264                 }                                               /* scan along page */
1265
1266                 if (tempPage != (Page) NULL)
1267                 {
1268                         /* Some tuples are removable; figure free space after removal */
1269                         PageRepairFragmentation(tempPage, NULL);
1270                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1271                         pfree(tempPage);
1272                         do_reap = true;
1273                 }
1274                 else
1275                 {
1276                         /* Just use current available space */
1277                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1278                         /* Need to reap the page if it has ~LP_USED line pointers */
1279                         do_reap = (vacpage->offsets_free > 0);
1280                 }
1281
1282                 free_size += vacpage->free;
1283
1284                 /*
1285                  * Add the page to fraged_pages if it has a useful amount of free
1286                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1287                  * don't know that accurately near the start of the relation, so
1288                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1289                  */
1290                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1291
1292                 if (do_reap || do_frag)
1293                 {
1294                         vacpagecopy = copy_vac_page(vacpage);
1295                         if (do_reap)
1296                                 vpage_insert(vacuum_pages, vacpagecopy);
1297                         if (do_frag)
1298                                 vpage_insert(fraged_pages, vacpagecopy);
1299                 }
1300
1301                 if (notup)
1302                         empty_end_pages++;
1303                 else
1304                         empty_end_pages = 0;
1305
1306                 if (pgchanged)
1307                 {
1308                         WriteBuffer(buf);
1309                         changed_pages++;
1310                 }
1311                 else
1312                         ReleaseBuffer(buf);
1313         }
1314
1315         pfree(vacpage);
1316
1317         /* save stats in the rel list for use later */
1318         vacrelstats->rel_tuples = num_tuples;
1319         vacrelstats->rel_pages = nblocks;
1320         if (num_tuples == 0)
1321                 min_tlen = max_tlen = 0;
1322         vacrelstats->min_tlen = min_tlen;
1323         vacrelstats->max_tlen = max_tlen;
1324
1325         vacuum_pages->empty_end_pages = empty_end_pages;
1326         fraged_pages->empty_end_pages = empty_end_pages;
1327
1328         /*
1329          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1330          * remove any "empty" end-pages from the list, and compute usable free
1331          * space = free space in remaining pages.
1332          */
1333         if (do_shrinking)
1334         {
1335                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1336                 fraged_pages->num_pages -= empty_end_pages;
1337                 usable_free_size = 0;
1338                 for (i = 0; i < fraged_pages->num_pages; i++)
1339                         usable_free_size += fraged_pages->pagedesc[i]->free;
1340         }
1341         else
1342         {
1343                 fraged_pages->num_pages = 0;
1344                 usable_free_size = 0;
1345         }
1346
1347         /* don't bother to save vtlinks if we will not call repair_frag */
1348         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1349         {
1350                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1351                           vac_cmp_vtlinks);
1352                 vacrelstats->vtlinks = vtlinks;
1353                 vacrelstats->num_vtlinks = num_vtlinks;
1354         }
1355         else
1356         {
1357                 vacrelstats->vtlinks = NULL;
1358                 vacrelstats->num_vtlinks = 0;
1359                 pfree(vtlinks);
1360         }
1361
1362         elog(elevel, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; "
1363                  "Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, "
1364                  "MaxLen %lu; Re-using: Free/Avail. Space %.0f/%.0f; "
1365                  "EndEmpty/Avail. Pages %u/%u.\n\t%s",
1366                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1367                  new_pages, num_tuples, tups_vacuumed,
1368                  nkeep, vacrelstats->num_vtlinks,
1369                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1370                  free_size, usable_free_size,
1371                  empty_end_pages, fraged_pages->num_pages,
1372                  vac_show_rusage(&ru0));
1373 }
1374
1375
1376 /*
1377  *      repair_frag() -- try to repair relation's fragmentation
1378  *
1379  *              This routine marks dead tuples as unused and tries re-use dead space
1380  *              by moving tuples (and inserting indexes if needed). It constructs
1381  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1382  *              for them after committing (in hack-manner - without losing locks
1383  *              and freeing memory!) current transaction. It truncates relation
1384  *              if some end-blocks are gone away.
1385  */
1386 static void
1387 repair_frag(VRelStats *vacrelstats, Relation onerel,
1388                         VacPageList vacuum_pages, VacPageList fraged_pages,
1389                         int nindexes, Relation *Irel)
1390 {
1391         TransactionId myXID;
1392         CommandId       myCID;
1393         Buffer          buf,
1394                                 cur_buffer;
1395         BlockNumber nblocks,
1396                                 blkno;
1397         BlockNumber last_move_dest_block = 0,
1398                                 last_vacuum_block;
1399         Page            page,
1400                                 ToPage = NULL;
1401         OffsetNumber offnum,
1402                                 maxoff,
1403                                 newoff,
1404                                 max_offset;
1405         ItemId          itemid,
1406                                 newitemid;
1407         HeapTupleData tuple,
1408                                 newtup;
1409         TupleDesc       tupdesc;
1410         ResultRelInfo *resultRelInfo;
1411         EState     *estate;
1412         TupleTable      tupleTable;
1413         TupleTableSlot *slot;
1414         VacPageListData Nvacpagelist;
1415         VacPage         cur_page = NULL,
1416                                 last_vacuum_page,
1417                                 vacpage,
1418                            *curpage;
1419         int                     cur_item = 0;
1420         int                     i;
1421         Size            tuple_len;
1422         int                     num_moved,
1423                                 num_fraged_pages,
1424                                 vacuumed_pages;
1425         int                     checked_moved,
1426                                 num_tuples,
1427                                 keep_tuples = 0;
1428         bool            isempty,
1429                                 dowrite,
1430                                 chain_tuple_moved;
1431         VacRUsage       ru0;
1432
1433         vac_init_rusage(&ru0);
1434
1435         myXID = GetCurrentTransactionId();
1436         myCID = GetCurrentCommandId();
1437
1438         tupdesc = RelationGetDescr(onerel);
1439
1440         /*
1441          * We need a ResultRelInfo and an EState so we can use the regular
1442          * executor's index-entry-making machinery.
1443          */
1444         resultRelInfo = makeNode(ResultRelInfo);
1445         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1446         resultRelInfo->ri_RelationDesc = onerel;
1447         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1448
1449         ExecOpenIndices(resultRelInfo);
1450
1451         estate = CreateExecutorState();
1452         estate->es_result_relations = resultRelInfo;
1453         estate->es_num_result_relations = 1;
1454         estate->es_result_relation_info = resultRelInfo;
1455
1456         /* Set up a dummy tuple table too */
1457         tupleTable = ExecCreateTupleTable(1);
1458         slot = ExecAllocTableSlot(tupleTable);
1459         ExecSetSlotDescriptor(slot, tupdesc, false);
1460
1461         Nvacpagelist.num_pages = 0;
1462         num_fraged_pages = fraged_pages->num_pages;
1463         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1464         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1465         if (vacuumed_pages > 0)
1466         {
1467                 /* get last reaped page from vacuum_pages */
1468                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1469                 last_vacuum_block = last_vacuum_page->blkno;
1470         }
1471         else
1472         {
1473                 last_vacuum_page = NULL;
1474                 last_vacuum_block = InvalidBlockNumber;
1475         }
1476         cur_buffer = InvalidBuffer;
1477         num_moved = 0;
1478
1479         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1480         vacpage->offsets_used = vacpage->offsets_free = 0;
1481
1482         /*
1483          * Scan pages backwards from the last nonempty page, trying to move
1484          * tuples down to lower pages.  Quit when we reach a page that we have
1485          * moved any tuples onto, or the first page if we haven't moved
1486          * anything, or when we find a page we cannot completely empty (this
1487          * last condition is handled by "break" statements within the loop).
1488          *
1489          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1490          * in order by blkno.
1491          */
1492         nblocks = vacrelstats->rel_pages;
1493         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1494                  blkno > last_move_dest_block;
1495                  blkno--)
1496         {
1497                 CHECK_FOR_INTERRUPTS();
1498
1499                 /*
1500                  * Forget fraged_pages pages at or after this one; they're no
1501                  * longer useful as move targets, since we only want to move down.
1502                  * Note that since we stop the outer loop at last_move_dest_block,
1503                  * pages removed here cannot have had anything moved onto them
1504                  * already.
1505                  *
1506                  * Also note that we don't change the stored fraged_pages list, only
1507                  * our local variable num_fraged_pages; so the forgotten pages are
1508                  * still available to be loaded into the free space map later.
1509                  */
1510                 while (num_fraged_pages > 0 &&
1511                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1512                 {
1513                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1514                         --num_fraged_pages;
1515                 }
1516
1517                 /*
1518                  * Process this page of relation.
1519                  */
1520                 buf = ReadBuffer(onerel, blkno);
1521                 page = BufferGetPage(buf);
1522
1523                 vacpage->offsets_free = 0;
1524
1525                 isempty = PageIsEmpty(page);
1526
1527                 dowrite = false;
1528
1529                 /* Is the page in the vacuum_pages list? */
1530                 if (blkno == last_vacuum_block)
1531                 {
1532                         if (last_vacuum_page->offsets_free > 0)
1533                         {
1534                                 /* there are dead tuples on this page - clean them */
1535                                 Assert(!isempty);
1536                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1537                                 vacuum_page(onerel, buf, last_vacuum_page);
1538                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1539                                 dowrite = true;
1540                         }
1541                         else
1542                                 Assert(isempty);
1543                         --vacuumed_pages;
1544                         if (vacuumed_pages > 0)
1545                         {
1546                                 /* get prev reaped page from vacuum_pages */
1547                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1548                                 last_vacuum_block = last_vacuum_page->blkno;
1549                         }
1550                         else
1551                         {
1552                                 last_vacuum_page = NULL;
1553                                 last_vacuum_block = InvalidBlockNumber;
1554                         }
1555                         if (isempty)
1556                         {
1557                                 ReleaseBuffer(buf);
1558                                 continue;
1559                         }
1560                 }
1561                 else
1562                         Assert(!isempty);
1563
1564                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1565                                                                                  * off this page, yet */
1566                 vacpage->blkno = blkno;
1567                 maxoff = PageGetMaxOffsetNumber(page);
1568                 for (offnum = FirstOffsetNumber;
1569                          offnum <= maxoff;
1570                          offnum = OffsetNumberNext(offnum))
1571                 {
1572                         itemid = PageGetItemId(page, offnum);
1573
1574                         if (!ItemIdIsUsed(itemid))
1575                                 continue;
1576
1577                         tuple.t_datamcxt = NULL;
1578                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1579                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1580                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1581
1582                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1583                         {
1584                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1585                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1586
1587                                 /*
1588                                  * If this (chain) tuple is moved by me already then I
1589                                  * have to check is it in vacpage or not - i.e. is it
1590                                  * moved while cleaning this page or some previous one.
1591                                  */
1592                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1593                                 {
1594                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1595                                                 elog(ERROR, "Invalid XVAC in tuple header");
1596                                         if (keep_tuples == 0)
1597                                                 continue;
1598                                         if (chain_tuple_moved)          /* some chains was moved
1599                                                                                                  * while */
1600                                         {                       /* cleaning this page */
1601                                                 Assert(vacpage->offsets_free > 0);
1602                                                 for (i = 0; i < vacpage->offsets_free; i++)
1603                                                 {
1604                                                         if (vacpage->offsets[i] == offnum)
1605                                                                 break;
1606                                                 }
1607                                                 if (i >= vacpage->offsets_free) /* not found */
1608                                                 {
1609                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1610                                                         keep_tuples--;
1611                                                 }
1612                                         }
1613                                         else
1614                                         {
1615                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1616                                                 keep_tuples--;
1617                                         }
1618                                         continue;
1619                                 }
1620                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1621                         }
1622
1623                         /*
1624                          * If this tuple is in the chain of tuples created in updates
1625                          * by "recent" transactions then we have to move all chain of
1626                          * tuples to another places.
1627                          *
1628                          * NOTE: this test is not 100% accurate: it is possible for a
1629                          * tuple to be an updated one with recent xmin, and yet not
1630                          * have a corresponding tuple in the vtlinks list.      Presumably
1631                          * there was once a parent tuple with xmax matching the xmin,
1632                          * but it's possible that that tuple has been removed --- for
1633                          * example, if it had xmin = xmax then
1634                          * HeapTupleSatisfiesVacuum would deem it removable as soon as
1635                          * the xmin xact completes.
1636                          *
1637                          * To be on the safe side, we abandon the repair_frag process if
1638                          * we cannot find the parent tuple in vtlinks.  This may be
1639                          * overly conservative; AFAICS it would be safe to move the
1640                          * chain.
1641                          */
1642                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
1643                          !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1644                                                                         OldestXmin)) ||
1645                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
1646                                                                                            HEAP_MARKED_FOR_UPDATE)) &&
1647                                  !(ItemPointerEquals(&(tuple.t_self),
1648                                                                          &(tuple.t_data->t_ctid)))))
1649                         {
1650                                 Buffer          Cbuf = buf;
1651                                 bool            freeCbuf = false;
1652                                 bool            chain_move_failed = false;
1653                                 Page            Cpage;
1654                                 ItemId          Citemid;
1655                                 ItemPointerData Ctid;
1656                                 HeapTupleData tp = tuple;
1657                                 Size            tlen = tuple_len;
1658                                 VTupleMove      vtmove;
1659                                 int                     num_vtmove;
1660                                 int                     free_vtmove;
1661                                 VacPage         to_vacpage = NULL;
1662                                 int                     to_item = 0;
1663                                 int                     ti;
1664
1665                                 if (cur_buffer != InvalidBuffer)
1666                                 {
1667                                         WriteBuffer(cur_buffer);
1668                                         cur_buffer = InvalidBuffer;
1669                                 }
1670
1671                                 /* Quick exit if we have no vtlinks to search in */
1672                                 if (vacrelstats->vtlinks == NULL)
1673                                 {
1674                                         elog(WARNING, "Parent item in update-chain not found - can't continue repair_frag");
1675                                         break;          /* out of walk-along-page loop */
1676                                 }
1677
1678                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
1679                                 num_vtmove = 0;
1680                                 free_vtmove = 100;
1681
1682                                 /*
1683                                  * If this tuple is in the begin/middle of the chain then
1684                                  * we have to move to the end of chain.
1685                                  */
1686                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
1687                                                                                           HEAP_MARKED_FOR_UPDATE)) &&
1688                                            !(ItemPointerEquals(&(tp.t_self),
1689                                                                                    &(tp.t_data->t_ctid))))
1690                                 {
1691                                         Ctid = tp.t_data->t_ctid;
1692                                         if (freeCbuf)
1693                                                 ReleaseBuffer(Cbuf);
1694                                         freeCbuf = true;
1695                                         Cbuf = ReadBuffer(onerel,
1696                                                                           ItemPointerGetBlockNumber(&Ctid));
1697                                         Cpage = BufferGetPage(Cbuf);
1698                                         Citemid = PageGetItemId(Cpage,
1699                                                                           ItemPointerGetOffsetNumber(&Ctid));
1700                                         if (!ItemIdIsUsed(Citemid))
1701                                         {
1702                                                 /*
1703                                                  * This means that in the middle of chain there
1704                                                  * was tuple updated by older (than OldestXmin)
1705                                                  * xaction and this tuple is already deleted by
1706                                                  * me. Actually, upper part of chain should be
1707                                                  * removed and seems that this should be handled
1708                                                  * in scan_heap(), but it's not implemented at the
1709                                                  * moment and so we just stop shrinking here.
1710                                                  */
1711                                                 elog(WARNING, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1712                                                 chain_move_failed = true;
1713                                                 break;  /* out of loop to move to chain end */
1714                                         }
1715                                         tp.t_datamcxt = NULL;
1716                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1717                                         tp.t_self = Ctid;
1718                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1719                                 }
1720                                 if (chain_move_failed)
1721                                 {
1722                                         if (freeCbuf)
1723                                                 ReleaseBuffer(Cbuf);
1724                                         pfree(vtmove);
1725                                         break;          /* out of walk-along-page loop */
1726                                 }
1727
1728                                 /*
1729                                  * Check if all items in chain can be moved
1730                                  */
1731                                 for (;;)
1732                                 {
1733                                         Buffer          Pbuf;
1734                                         Page            Ppage;
1735                                         ItemId          Pitemid;
1736                                         HeapTupleData Ptp;
1737                                         VTupleLinkData vtld,
1738                                                            *vtlp;
1739
1740                                         if (to_vacpage == NULL ||
1741                                                 !enough_space(to_vacpage, tlen))
1742                                         {
1743                                                 for (i = 0; i < num_fraged_pages; i++)
1744                                                 {
1745                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1746                                                                 break;
1747                                                 }
1748
1749                                                 if (i == num_fraged_pages)
1750                                                 {
1751                                                         /* can't move item anywhere */
1752                                                         chain_move_failed = true;
1753                                                         break;          /* out of check-all-items loop */
1754                                                 }
1755                                                 to_item = i;
1756                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1757                                         }
1758                                         to_vacpage->free -= MAXALIGN(tlen);
1759                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1760                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1761                                         (to_vacpage->offsets_used)++;
1762                                         if (free_vtmove == 0)
1763                                         {
1764                                                 free_vtmove = 1000;
1765                                                 vtmove = (VTupleMove)
1766                                                         repalloc(vtmove,
1767                                                                          (free_vtmove + num_vtmove) *
1768                                                                          sizeof(VTupleMoveData));
1769                                         }
1770                                         vtmove[num_vtmove].tid = tp.t_self;
1771                                         vtmove[num_vtmove].vacpage = to_vacpage;
1772                                         if (to_vacpage->offsets_used == 1)
1773                                                 vtmove[num_vtmove].cleanVpd = true;
1774                                         else
1775                                                 vtmove[num_vtmove].cleanVpd = false;
1776                                         free_vtmove--;
1777                                         num_vtmove++;
1778
1779                                         /* At beginning of chain? */
1780                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1781                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
1782                                                                                           OldestXmin))
1783                                                 break;
1784
1785                                         /* No, move to tuple with prior row version */
1786                                         vtld.new_tid = tp.t_self;
1787                                         vtlp = (VTupleLink)
1788                                                 vac_bsearch((void *) &vtld,
1789                                                                         (void *) (vacrelstats->vtlinks),
1790                                                                         vacrelstats->num_vtlinks,
1791                                                                         sizeof(VTupleLinkData),
1792                                                                         vac_cmp_vtlinks);
1793                                         if (vtlp == NULL)
1794                                         {
1795                                                 /* see discussion above */
1796                                                 elog(WARNING, "Parent item in update-chain not found - can't continue repair_frag");
1797                                                 chain_move_failed = true;
1798                                                 break;  /* out of check-all-items loop */
1799                                         }
1800                                         tp.t_self = vtlp->this_tid;
1801                                         Pbuf = ReadBuffer(onerel,
1802                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1803                                         Ppage = BufferGetPage(Pbuf);
1804                                         Pitemid = PageGetItemId(Ppage,
1805                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1806                                         /* this can't happen since we saw tuple earlier: */
1807                                         if (!ItemIdIsUsed(Pitemid))
1808                                                 elog(ERROR, "Parent itemid marked as unused");
1809                                         Ptp.t_datamcxt = NULL;
1810                                         Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1811
1812                                         /* ctid should not have changed since we saved it */
1813                                         Assert(ItemPointerEquals(&(vtld.new_tid),
1814                                                                                          &(Ptp.t_data->t_ctid)));
1815
1816                                         /*
1817                                          * Read above about cases when !ItemIdIsUsed(Citemid)
1818                                          * (child item is removed)... Due to the fact that at
1819                                          * the moment we don't remove unuseful part of
1820                                          * update-chain, it's possible to get too old parent
1821                                          * row here. Like as in the case which caused this
1822                                          * problem, we stop shrinking here. I could try to
1823                                          * find real parent row but want not to do it because
1824                                          * of real solution will be implemented anyway, later,
1825                                          * and we are too close to 6.5 release. - vadim
1826                                          * 06/11/99
1827                                          */
1828                                         if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
1829                                                                          HeapTupleHeaderGetXmin(tp.t_data))))
1830                                         {
1831                                                 ReleaseBuffer(Pbuf);
1832                                                 elog(WARNING, "Too old parent tuple found - can't continue repair_frag");
1833                                                 chain_move_failed = true;
1834                                                 break;  /* out of check-all-items loop */
1835                                         }
1836                                         tp.t_datamcxt = Ptp.t_datamcxt;
1837                                         tp.t_data = Ptp.t_data;
1838                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
1839                                         if (freeCbuf)
1840                                                 ReleaseBuffer(Cbuf);
1841                                         Cbuf = Pbuf;
1842                                         freeCbuf = true;
1843                                 }                               /* end of check-all-items loop */
1844
1845                                 if (freeCbuf)
1846                                         ReleaseBuffer(Cbuf);
1847                                 freeCbuf = false;
1848
1849                                 if (chain_move_failed)
1850                                 {
1851                                         /*
1852                                          * Undo changes to offsets_used state.  We don't
1853                                          * bother cleaning up the amount-free state, since
1854                                          * we're not going to do any further tuple motion.
1855                                          */
1856                                         for (i = 0; i < num_vtmove; i++)
1857                                         {
1858                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1859                                                 (vtmove[i].vacpage->offsets_used)--;
1860                                         }
1861                                         pfree(vtmove);
1862                                         break;          /* out of walk-along-page loop */
1863                                 }
1864
1865                                 /*
1866                                  * Okay, move the whle tuple chain
1867                                  */
1868                                 ItemPointerSetInvalid(&Ctid);
1869                                 for (ti = 0; ti < num_vtmove; ti++)
1870                                 {
1871                                         VacPage         destvacpage = vtmove[ti].vacpage;
1872
1873                                         /* Get page to move from */
1874                                         tuple.t_self = vtmove[ti].tid;
1875                                         Cbuf = ReadBuffer(onerel,
1876                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1877
1878                                         /* Get page to move to */
1879                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1880
1881                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1882                                         if (cur_buffer != Cbuf)
1883                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1884
1885                                         ToPage = BufferGetPage(cur_buffer);
1886                                         Cpage = BufferGetPage(Cbuf);
1887
1888                                         Citemid = PageGetItemId(Cpage,
1889                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1890                                         tuple.t_datamcxt = NULL;
1891                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1892                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1893
1894                                         /*
1895                                          * make a copy of the source tuple, and then mark the
1896                                          * source tuple MOVED_OFF.
1897                                          */
1898                                         heap_copytuple_with_tuple(&tuple, &newtup);
1899
1900                                         /*
1901                                          * register invalidation of source tuple in catcaches.
1902                                          */
1903                                         CacheInvalidateHeapTuple(onerel, &tuple);
1904
1905                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1906                                         START_CRIT_SECTION();
1907
1908                                         tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
1909                                                                                                   HEAP_XMIN_INVALID |
1910                                                                                                   HEAP_MOVED_IN);
1911                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1912                                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
1913
1914                                         /*
1915                                          * If this page was not used before - clean it.
1916                                          *
1917                                          * NOTE: a nasty bug used to lurk here.  It is possible
1918                                          * for the source and destination pages to be the same
1919                                          * (since this tuple-chain member can be on a page
1920                                          * lower than the one we're currently processing in
1921                                          * the outer loop).  If that's true, then after
1922                                          * vacuum_page() the source tuple will have been
1923                                          * moved, and tuple.t_data will be pointing at
1924                                          * garbage.  Therefore we must do everything that uses
1925                                          * tuple.t_data BEFORE this step!!
1926                                          *
1927                                          * This path is different from the other callers of
1928                                          * vacuum_page, because we have already incremented
1929                                          * the vacpage's offsets_used field to account for the
1930                                          * tuple(s) we expect to move onto the page. Therefore
1931                                          * vacuum_page's check for offsets_used == 0 is wrong.
1932                                          * But since that's a good debugging check for all
1933                                          * other callers, we work around it here rather than
1934                                          * remove it.
1935                                          */
1936                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1937                                         {
1938                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1939
1940                                                 destvacpage->offsets_used = 0;
1941                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1942                                                 destvacpage->offsets_used = sv_offsets_used;
1943                                         }
1944
1945                                         /*
1946                                          * Update the state of the copied tuple, and store it
1947                                          * on the destination page.
1948                                          */
1949                                         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
1950                                                                                                    HEAP_XMIN_INVALID |
1951                                                                                                    HEAP_MOVED_OFF);
1952                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1953                                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
1954                                         newoff = PageAddItem(ToPage,
1955                                                                                  (Item) newtup.t_data,
1956                                                                                  tuple_len,
1957                                                                                  InvalidOffsetNumber,
1958                                                                                  LP_USED);
1959                                         if (newoff == InvalidOffsetNumber)
1960                                         {
1961                                                 elog(PANIC, "moving chain: failed to add item with len = %lu to page %u",
1962                                                   (unsigned long) tuple_len, destvacpage->blkno);
1963                                         }
1964                                         newitemid = PageGetItemId(ToPage, newoff);
1965                                         pfree(newtup.t_data);
1966                                         newtup.t_datamcxt = NULL;
1967                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1968                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1969
1970                                         /* XLOG stuff */
1971                                         if (!onerel->rd_istemp)
1972                                         {
1973                                                 XLogRecPtr      recptr =
1974                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1975                                                                           cur_buffer, &newtup);
1976
1977                                                 if (Cbuf != cur_buffer)
1978                                                 {
1979                                                         PageSetLSN(Cpage, recptr);
1980                                                         PageSetSUI(Cpage, ThisStartUpID);
1981                                                 }
1982                                                 PageSetLSN(ToPage, recptr);
1983                                                 PageSetSUI(ToPage, ThisStartUpID);
1984                                         }
1985                                         else
1986                                         {
1987                                                 /*
1988                                                  * No XLOG record, but still need to flag that XID
1989                                                  * exists on disk
1990                                                  */
1991                                                 MyXactMadeTempRelUpdate = true;
1992                                         }
1993
1994                                         END_CRIT_SECTION();
1995
1996                                         if (destvacpage->blkno > last_move_dest_block)
1997                                                 last_move_dest_block = destvacpage->blkno;
1998
1999                                         /*
2000                                          * Set new tuple's t_ctid pointing to itself for last
2001                                          * tuple in chain, and to next tuple in chain
2002                                          * otherwise.
2003                                          */
2004                                         if (!ItemPointerIsValid(&Ctid))
2005                                                 newtup.t_data->t_ctid = newtup.t_self;
2006                                         else
2007                                                 newtup.t_data->t_ctid = Ctid;
2008                                         Ctid = newtup.t_self;
2009
2010                                         num_moved++;
2011
2012                                         /*
2013                                          * Remember that we moved tuple from the current page
2014                                          * (corresponding index tuple will be cleaned).
2015                                          */
2016                                         if (Cbuf == buf)
2017                                                 vacpage->offsets[vacpage->offsets_free++] =
2018                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2019                                         else
2020                                                 keep_tuples++;
2021
2022                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2023                                         if (cur_buffer != Cbuf)
2024                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
2025
2026                                         /* Create index entries for the moved tuple */
2027                                         if (resultRelInfo->ri_NumIndices > 0)
2028                                         {
2029                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2030                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
2031                                                                                           estate, true);
2032                                         }
2033
2034                                         WriteBuffer(cur_buffer);
2035                                         WriteBuffer(Cbuf);
2036                                 }                               /* end of move-the-tuple-chain loop */
2037
2038                                 cur_buffer = InvalidBuffer;
2039                                 pfree(vtmove);
2040                                 chain_tuple_moved = true;
2041
2042                                 /* advance to next tuple in walk-along-page loop */
2043                                 continue;
2044                         }                                       /* end of is-tuple-in-chain test */
2045
2046                         /* try to find new page for this tuple */
2047                         if (cur_buffer == InvalidBuffer ||
2048                                 !enough_space(cur_page, tuple_len))
2049                         {
2050                                 if (cur_buffer != InvalidBuffer)
2051                                 {
2052                                         WriteBuffer(cur_buffer);
2053                                         cur_buffer = InvalidBuffer;
2054                                 }
2055                                 for (i = 0; i < num_fraged_pages; i++)
2056                                 {
2057                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2058                                                 break;
2059                                 }
2060                                 if (i == num_fraged_pages)
2061                                         break;          /* can't move item anywhere */
2062                                 cur_item = i;
2063                                 cur_page = fraged_pages->pagedesc[cur_item];
2064                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
2065                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
2066                                 ToPage = BufferGetPage(cur_buffer);
2067                                 /* if this page was not used before - clean it */
2068                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
2069                                         vacuum_page(onerel, cur_buffer, cur_page);
2070                         }
2071                         else
2072                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
2073
2074                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2075
2076                         /* copy tuple */
2077                         heap_copytuple_with_tuple(&tuple, &newtup);
2078
2079                         /*
2080                          * register invalidation of source tuple in catcaches.
2081                          *
2082                          * (Note: we do not need to register the copied tuple, because we
2083                          * are not changing the tuple contents and so there cannot be
2084                          * any need to flush negative catcache entries.)
2085                          */
2086                         CacheInvalidateHeapTuple(onerel, &tuple);
2087
2088                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
2089                         START_CRIT_SECTION();
2090
2091                         /*
2092                          * Mark new tuple as MOVED_IN by me.
2093                          */
2094                         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2095                                                                                    HEAP_XMIN_INVALID |
2096                                                                                    HEAP_MOVED_OFF);
2097                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2098                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2099
2100                         /* add tuple to the page */
2101                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
2102                                                                  InvalidOffsetNumber, LP_USED);
2103                         if (newoff == InvalidOffsetNumber)
2104                         {
2105                                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2106                                          (unsigned long) tuple_len,
2107                                          cur_page->blkno, (unsigned long) cur_page->free,
2108                                          cur_page->offsets_used, cur_page->offsets_free);
2109                         }
2110                         newitemid = PageGetItemId(ToPage, newoff);
2111                         pfree(newtup.t_data);
2112                         newtup.t_datamcxt = NULL;
2113                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
2114                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
2115                         newtup.t_self = newtup.t_data->t_ctid;
2116
2117                         /*
2118                          * Mark old tuple as MOVED_OFF by me.
2119                          */
2120                         tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2121                                                                                   HEAP_XMIN_INVALID |
2122                                                                                   HEAP_MOVED_IN);
2123                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
2124                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
2125
2126                         /* XLOG stuff */
2127                         if (!onerel->rd_istemp)
2128                         {
2129                                 XLogRecPtr      recptr =
2130                                 log_heap_move(onerel, buf, tuple.t_self,
2131                                                           cur_buffer, &newtup);
2132
2133                                 PageSetLSN(page, recptr);
2134                                 PageSetSUI(page, ThisStartUpID);
2135                                 PageSetLSN(ToPage, recptr);
2136                                 PageSetSUI(ToPage, ThisStartUpID);
2137                         }
2138                         else
2139                         {
2140                                 /*
2141                                  * No XLOG record, but still need to flag that XID exists
2142                                  * on disk
2143                                  */
2144                                 MyXactMadeTempRelUpdate = true;
2145                         }
2146
2147                         END_CRIT_SECTION();
2148
2149                         cur_page->offsets_used++;
2150                         num_moved++;
2151                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
2152                         if (cur_page->blkno > last_move_dest_block)
2153                                 last_move_dest_block = cur_page->blkno;
2154
2155                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2156
2157                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2158                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2159
2160                         /* insert index' tuples if needed */
2161                         if (resultRelInfo->ri_NumIndices > 0)
2162                         {
2163                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2164                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
2165                         }
2166                 }                                               /* walk along page */
2167
2168                 /*
2169                  * If we broke out of the walk-along-page loop early (ie, still
2170                  * have offnum <= maxoff), then we failed to move some tuple off
2171                  * this page.  No point in shrinking any more, so clean up and
2172                  * exit the per-page loop.
2173                  */
2174                 if (offnum < maxoff && keep_tuples > 0)
2175                 {
2176                         OffsetNumber off;
2177
2178                         /*
2179                          * Fix vacpage state for any unvisited tuples remaining on
2180                          * page
2181                          */
2182                         for (off = OffsetNumberNext(offnum);
2183                                  off <= maxoff;
2184                                  off = OffsetNumberNext(off))
2185                         {
2186                                 itemid = PageGetItemId(page, off);
2187                                 if (!ItemIdIsUsed(itemid))
2188                                         continue;
2189                                 tuple.t_datamcxt = NULL;
2190                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2191                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2192                                         continue;
2193                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2194                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
2195                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2196                                 {
2197                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2198                                                 elog(ERROR, "Invalid XVAC in tuple header (4)");
2199                                         /* some chains was moved while */
2200                                         if (chain_tuple_moved)
2201                                         {                       /* cleaning this page */
2202                                                 Assert(vacpage->offsets_free > 0);
2203                                                 for (i = 0; i < vacpage->offsets_free; i++)
2204                                                 {
2205                                                         if (vacpage->offsets[i] == off)
2206                                                                 break;
2207                                                 }
2208                                                 if (i >= vacpage->offsets_free) /* not found */
2209                                                 {
2210                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2211                                                         Assert(keep_tuples > 0);
2212                                                         keep_tuples--;
2213                                                 }
2214                                         }
2215                                         else
2216                                         {
2217                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2218                                                 Assert(keep_tuples > 0);
2219                                                 keep_tuples--;
2220                                         }
2221                                 }
2222                                 else
2223                                         elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2224                         }
2225                 }
2226
2227                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2228                 {
2229                         if (chain_tuple_moved)          /* else - they are ordered */
2230                         {
2231                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2232                                           sizeof(OffsetNumber), vac_cmp_offno);
2233                         }
2234                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2235                         WriteBuffer(buf);
2236                 }
2237                 else if (dowrite)
2238                         WriteBuffer(buf);
2239                 else
2240                         ReleaseBuffer(buf);
2241
2242                 if (offnum <= maxoff)
2243                         break;                          /* had to quit early, see above note */
2244
2245         }                                                       /* walk along relation */
2246
2247         blkno++;                                        /* new number of blocks */
2248
2249         if (cur_buffer != InvalidBuffer)
2250         {
2251                 Assert(num_moved > 0);
2252                 WriteBuffer(cur_buffer);
2253         }
2254
2255         if (num_moved > 0)
2256         {
2257                 /*
2258                  * We have to commit our tuple movings before we truncate the
2259                  * relation.  Ideally we should do Commit/StartTransactionCommand
2260                  * here, relying on the session-level table lock to protect our
2261                  * exclusive access to the relation.  However, that would require
2262                  * a lot of extra code to close and re-open the relation, indexes,
2263                  * etc.  For now, a quick hack: record status of current
2264                  * transaction as committed, and continue.
2265                  */
2266                 RecordTransactionCommit();
2267         }
2268
2269         /*
2270          * We are not going to move any more tuples across pages, but we still
2271          * need to apply vacuum_page to compact free space in the remaining
2272          * pages in vacuum_pages list.  Note that some of these pages may also
2273          * be in the fraged_pages list, and may have had tuples moved onto
2274          * them; if so, we already did vacuum_page and needn't do it again.
2275          */
2276         for (i = 0, curpage = vacuum_pages->pagedesc;
2277                  i < vacuumed_pages;
2278                  i++, curpage++)
2279         {
2280                 CHECK_FOR_INTERRUPTS();
2281                 Assert((*curpage)->blkno < blkno);
2282                 if ((*curpage)->offsets_used == 0)
2283                 {
2284                         /* this page was not used as a move target, so must clean it */
2285                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2286                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2287                         page = BufferGetPage(buf);
2288                         if (!PageIsEmpty(page))
2289                                 vacuum_page(onerel, buf, *curpage);
2290                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2291                         WriteBuffer(buf);
2292                 }
2293         }
2294
2295         /*
2296          * Now scan all the pages that we moved tuples onto and update tuple
2297          * status bits.  This is not really necessary, but will save time for
2298          * future transactions examining these tuples.
2299          *
2300          * XXX WARNING that this code fails to clear HEAP_MOVED_OFF tuples from
2301          * pages that were move source pages but not move dest pages.  One
2302          * also wonders whether it wouldn't be better to skip this step and
2303          * let the tuple status updates happen someplace that's not holding an
2304          * exclusive lock on the relation.
2305          */
2306         checked_moved = 0;
2307         for (i = 0, curpage = fraged_pages->pagedesc;
2308                  i < num_fraged_pages;
2309                  i++, curpage++)
2310         {
2311                 CHECK_FOR_INTERRUPTS();
2312                 Assert((*curpage)->blkno < blkno);
2313                 if ((*curpage)->blkno > last_move_dest_block)
2314                         break;                          /* no need to scan any further */
2315                 if ((*curpage)->offsets_used == 0)
2316                         continue;                       /* this page was never used as a move dest */
2317                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2318                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2319                 page = BufferGetPage(buf);
2320                 num_tuples = 0;
2321                 max_offset = PageGetMaxOffsetNumber(page);
2322                 for (newoff = FirstOffsetNumber;
2323                          newoff <= max_offset;
2324                          newoff = OffsetNumberNext(newoff))
2325                 {
2326                         itemid = PageGetItemId(page, newoff);
2327                         if (!ItemIdIsUsed(itemid))
2328                                 continue;
2329                         tuple.t_datamcxt = NULL;
2330                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2331                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2332                         {
2333                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED))
2334                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2335                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2336                                         elog(ERROR, "Invalid XVAC in tuple header (2)");
2337                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2338                                 {
2339                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2340                                         tuple.t_data->t_infomask &= ~HEAP_MOVED;
2341                                         num_tuples++;
2342                                 }
2343                                 else
2344                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2345                         }
2346                 }
2347                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2348                 WriteBuffer(buf);
2349                 Assert((*curpage)->offsets_used == num_tuples);
2350                 checked_moved += num_tuples;
2351         }
2352         Assert(num_moved == checked_moved);
2353
2354         elog(elevel, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
2355                  RelationGetRelationName(onerel),
2356                  nblocks, blkno, num_moved,
2357                  vac_show_rusage(&ru0));
2358
2359         /*
2360          * Reflect the motion of system tuples to catalog cache here.
2361          */
2362         CommandCounterIncrement();
2363
2364         if (Nvacpagelist.num_pages > 0)
2365         {
2366                 /* vacuum indexes again if needed */
2367                 if (Irel != (Relation *) NULL)
2368                 {
2369                         VacPage    *vpleft,
2370                                            *vpright,
2371                                                 vpsave;
2372
2373                         /* re-sort Nvacpagelist.pagedesc */
2374                         for (vpleft = Nvacpagelist.pagedesc,
2375                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2376                                  vpleft < vpright; vpleft++, vpright--)
2377                         {
2378                                 vpsave = *vpleft;
2379                                 *vpleft = *vpright;
2380                                 *vpright = vpsave;
2381                         }
2382                         Assert(keep_tuples >= 0);
2383                         for (i = 0; i < nindexes; i++)
2384                                 vacuum_index(&Nvacpagelist, Irel[i],
2385                                                          vacrelstats->rel_tuples, keep_tuples);
2386                 }
2387
2388                 /* clean moved tuples from last page in Nvacpagelist list */
2389                 if (vacpage->blkno == (blkno - 1) &&
2390                         vacpage->offsets_free > 0)
2391                 {
2392                         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2393                         OffsetNumber *unused = unbuf;
2394                         int                     uncnt;
2395
2396                         buf = ReadBuffer(onerel, vacpage->blkno);
2397                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2398                         page = BufferGetPage(buf);
2399                         num_tuples = 0;
2400                         maxoff = PageGetMaxOffsetNumber(page);
2401                         for (offnum = FirstOffsetNumber;
2402                                  offnum <= maxoff;
2403                                  offnum = OffsetNumberNext(offnum))
2404                         {
2405                                 itemid = PageGetItemId(page, offnum);
2406                                 if (!ItemIdIsUsed(itemid))
2407                                         continue;
2408                                 tuple.t_datamcxt = NULL;
2409                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2410
2411                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2412                                 {
2413                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2414                                         {
2415                                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2416                                                         elog(ERROR, "Invalid XVAC in tuple header (3)");
2417                                                 itemid->lp_flags &= ~LP_USED;
2418                                                 num_tuples++;
2419                                         }
2420                                         else
2421                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (3)");
2422                                 }
2423
2424                         }
2425                         Assert(vacpage->offsets_free == num_tuples);
2426
2427                         START_CRIT_SECTION();
2428
2429                         uncnt = PageRepairFragmentation(page, unused);
2430
2431                         /* XLOG stuff */
2432                         if (!onerel->rd_istemp)
2433                         {
2434                                 XLogRecPtr      recptr;
2435
2436                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
2437                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2438                                 PageSetLSN(page, recptr);
2439                                 PageSetSUI(page, ThisStartUpID);
2440                         }
2441                         else
2442                         {
2443                                 /*
2444                                  * No XLOG record, but still need to flag that XID exists
2445                                  * on disk
2446                                  */
2447                                 MyXactMadeTempRelUpdate = true;
2448                         }
2449
2450                         END_CRIT_SECTION();
2451
2452                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2453                         WriteBuffer(buf);
2454                 }
2455
2456                 /* now - free new list of reaped pages */
2457                 curpage = Nvacpagelist.pagedesc;
2458                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2459                         pfree(*curpage);
2460                 pfree(Nvacpagelist.pagedesc);
2461         }
2462
2463         /*
2464          * Flush dirty pages out to disk.  We do this unconditionally, even if
2465          * we don't need to truncate, because we want to ensure that all
2466          * tuples have correct on-row commit status on disk (see bufmgr.c's
2467          * comments for FlushRelationBuffers()).
2468          */
2469         i = FlushRelationBuffers(onerel, blkno);
2470         if (i < 0)
2471                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2472                          i);
2473
2474         /* truncate relation, if needed */
2475         if (blkno < nblocks)
2476         {
2477                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2478                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2479                 onerel->rd_targblock = InvalidBlockNumber;
2480                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2481         }
2482
2483         /* clean up */
2484         pfree(vacpage);
2485         if (vacrelstats->vtlinks != NULL)
2486                 pfree(vacrelstats->vtlinks);
2487
2488         ExecDropTupleTable(tupleTable, true);
2489
2490         ExecCloseIndices(resultRelInfo);
2491 }
2492
2493 /*
2494  *      vacuum_heap() -- free dead tuples
2495  *
2496  *              This routine marks dead tuples as unused and truncates relation
2497  *              if there are "empty" end-blocks.
2498  */
2499 static void
2500 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2501 {
2502         Buffer          buf;
2503         VacPage    *vacpage;
2504         BlockNumber relblocks;
2505         int                     nblocks;
2506         int                     i;
2507
2508         nblocks = vacuum_pages->num_pages;
2509         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2510
2511         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2512         {
2513                 CHECK_FOR_INTERRUPTS();
2514                 if ((*vacpage)->offsets_free > 0)
2515                 {
2516                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2517                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2518                         vacuum_page(onerel, buf, *vacpage);
2519                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2520                         WriteBuffer(buf);
2521                 }
2522         }
2523
2524         /*
2525          * Flush dirty pages out to disk.  We do this unconditionally, even if
2526          * we don't need to truncate, because we want to ensure that all
2527          * tuples have correct on-row commit status on disk (see bufmgr.c's
2528          * comments for FlushRelationBuffers()).
2529          */
2530         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2531         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2532
2533         i = FlushRelationBuffers(onerel, relblocks);
2534         if (i < 0)
2535                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2536                          i);
2537
2538         /* truncate relation if there are some empty end-pages */
2539         if (vacuum_pages->empty_end_pages > 0)
2540         {
2541                 elog(elevel, "Rel %s: Pages: %u --> %u.",
2542                          RelationGetRelationName(onerel),
2543                          vacrelstats->rel_pages, relblocks);
2544                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2545                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2546                 onerel->rd_targblock = InvalidBlockNumber;
2547                 vacrelstats->rel_pages = relblocks;             /* set new number of
2548                                                                                                  * blocks */
2549         }
2550 }
2551
2552 /*
2553  *      vacuum_page() -- free dead tuples on a page
2554  *                                       and repair its fragmentation.
2555  */
2556 static void
2557 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2558 {
2559         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2560         OffsetNumber *unused = unbuf;
2561         int                     uncnt;
2562         Page            page = BufferGetPage(buffer);
2563         ItemId          itemid;
2564         int                     i;
2565
2566         /* There shouldn't be any tuples moved onto the page yet! */
2567         Assert(vacpage->offsets_used == 0);
2568
2569         START_CRIT_SECTION();
2570
2571         for (i = 0; i < vacpage->offsets_free; i++)
2572         {
2573                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2574                 itemid->lp_flags &= ~LP_USED;
2575         }
2576
2577         uncnt = PageRepairFragmentation(page, unused);
2578
2579         /* XLOG stuff */
2580         if (!onerel->rd_istemp)
2581         {
2582                 XLogRecPtr      recptr;
2583
2584                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2585                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2586                 PageSetLSN(page, recptr);
2587                 PageSetSUI(page, ThisStartUpID);
2588         }
2589         else
2590         {
2591                 /* No XLOG record, but still need to flag that XID exists on disk */
2592                 MyXactMadeTempRelUpdate = true;
2593         }
2594
2595         END_CRIT_SECTION();
2596 }
2597
2598 /*
2599  *      scan_index() -- scan one index relation to update statistic.
2600  *
2601  * We use this when we have no deletions to do.
2602  */
2603 static void
2604 scan_index(Relation indrel, double num_tuples)
2605 {
2606         IndexBulkDeleteResult *stats;
2607         VacRUsage       ru0;
2608
2609         vac_init_rusage(&ru0);
2610
2611         /*
2612          * Even though we're not planning to delete anything, use the
2613          * ambulkdelete call, so that the scan happens within the index AM for
2614          * more speed.
2615          */
2616         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2617
2618         if (!stats)
2619                 return;
2620
2621         /* now update statistics in pg_class */
2622         vac_update_relstats(RelationGetRelid(indrel),
2623                                                 stats->num_pages, stats->num_index_tuples,
2624                                                 false);
2625
2626         elog(elevel, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
2627                  RelationGetRelationName(indrel),
2628                  stats->num_pages, stats->num_index_tuples,
2629                  vac_show_rusage(&ru0));
2630
2631         /*
2632          * Check for tuple count mismatch.      If the index is partial, then it's
2633          * OK for it to have fewer tuples than the heap; else we got trouble.
2634          */
2635         if (stats->num_index_tuples != num_tuples)
2636         {
2637                 if (stats->num_index_tuples > num_tuples ||
2638                         !vac_is_partial_index(indrel))
2639                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f)."
2640                                  "\n\tRecreate the index.",
2641                                  RelationGetRelationName(indrel),
2642                                  stats->num_index_tuples, num_tuples);
2643         }
2644
2645         pfree(stats);
2646 }
2647
2648 /*
2649  *      vacuum_index() -- vacuum one index relation.
2650  *
2651  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2652  *              It's locked. Indrel is an index relation on the vacuumed heap.
2653  *
2654  *              We don't bother to set locks on the index relation here, since
2655  *              the parent table is exclusive-locked already.
2656  *
2657  *              Finally, we arrange to update the index relation's statistics in
2658  *              pg_class.
2659  */
2660 static void
2661 vacuum_index(VacPageList vacpagelist, Relation indrel,
2662                          double num_tuples, int keep_tuples)
2663 {
2664         IndexBulkDeleteResult *stats;
2665         VacRUsage       ru0;
2666
2667         vac_init_rusage(&ru0);
2668
2669         /* Do bulk deletion */
2670         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2671
2672         if (!stats)
2673                 return;
2674
2675         /* now update statistics in pg_class */
2676         vac_update_relstats(RelationGetRelid(indrel),
2677                                                 stats->num_pages, stats->num_index_tuples,
2678                                                 false);
2679
2680         elog(elevel, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
2681                  RelationGetRelationName(indrel), stats->num_pages,
2682                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2683                  vac_show_rusage(&ru0));
2684
2685         /*
2686          * Check for tuple count mismatch.      If the index is partial, then it's
2687          * OK for it to have fewer tuples than the heap; else we got trouble.
2688          */
2689         if (stats->num_index_tuples != num_tuples + keep_tuples)
2690         {
2691                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2692                         !vac_is_partial_index(indrel))
2693                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f)."
2694                                  "\n\tRecreate the index.",
2695                                  RelationGetRelationName(indrel),
2696                                  stats->num_index_tuples, num_tuples);
2697         }
2698
2699         pfree(stats);
2700 }
2701
2702 /*
2703  *      tid_reaped() -- is a particular tid reaped?
2704  *
2705  *              This has the right signature to be an IndexBulkDeleteCallback.
2706  *
2707  *              vacpagelist->VacPage_array is sorted in right order.
2708  */
2709 static bool
2710 tid_reaped(ItemPointer itemptr, void *state)
2711 {
2712         VacPageList vacpagelist = (VacPageList) state;
2713         OffsetNumber ioffno;
2714         OffsetNumber *voff;
2715         VacPage         vp,
2716                            *vpp;
2717         VacPageData vacpage;
2718
2719         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2720         ioffno = ItemPointerGetOffsetNumber(itemptr);
2721
2722         vp = &vacpage;
2723         vpp = (VacPage *) vac_bsearch((void *) &vp,
2724                                                                   (void *) (vacpagelist->pagedesc),
2725                                                                   vacpagelist->num_pages,
2726                                                                   sizeof(VacPage),
2727                                                                   vac_cmp_blk);
2728
2729         if (vpp == NULL)
2730                 return false;
2731
2732         /* ok - we are on a partially or fully reaped page */
2733         vp = *vpp;
2734
2735         if (vp->offsets_free == 0)
2736         {
2737                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2738                 return true;
2739         }
2740
2741         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2742                                                                                 (void *) (vp->offsets),
2743                                                                                 vp->offsets_free,
2744                                                                                 sizeof(OffsetNumber),
2745                                                                                 vac_cmp_offno);
2746
2747         if (voff == NULL)
2748                 return false;
2749
2750         /* tid is reaped */
2751         return true;
2752 }
2753
2754 /*
2755  * Dummy version for scan_index.
2756  */
2757 static bool
2758 dummy_tid_reaped(ItemPointer itemptr, void *state)
2759 {
2760         return false;
2761 }
2762
2763 /*
2764  * Update the shared Free Space Map with the info we now have about
2765  * free space in the relation, discarding any old info the map may have.
2766  */
2767 static void
2768 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2769                            BlockNumber rel_pages)
2770 {
2771         int                     nPages = fraged_pages->num_pages;
2772         int                     i;
2773         PageFreeSpaceInfo *pageSpaces;
2774
2775         /* +1 to avoid palloc(0) */
2776         pageSpaces = (PageFreeSpaceInfo *)
2777                 palloc((nPages + 1) * sizeof(PageFreeSpaceInfo));
2778
2779         for (i = 0; i < nPages; i++)
2780         {
2781                 pageSpaces[i].blkno = fraged_pages->pagedesc[i]->blkno;
2782                 pageSpaces[i].avail = fraged_pages->pagedesc[i]->free;
2783
2784                 /*
2785                  * fraged_pages may contain entries for pages that we later
2786                  * decided to truncate from the relation; don't enter them into
2787                  * the free space map!
2788                  */
2789                 if (pageSpaces[i].blkno >= rel_pages)
2790                 {
2791                         nPages = i;
2792                         break;
2793                 }
2794         }
2795
2796         MultiRecordFreeSpace(&onerel->rd_node, 0, nPages, pageSpaces);
2797
2798         pfree(pageSpaces);
2799 }
2800
2801 /* Copy a VacPage structure */
2802 static VacPage
2803 copy_vac_page(VacPage vacpage)
2804 {
2805         VacPage         newvacpage;
2806
2807         /* allocate a VacPageData entry */
2808         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2809                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2810
2811         /* fill it in */
2812         if (vacpage->offsets_free > 0)
2813                 memcpy(newvacpage->offsets, vacpage->offsets,
2814                            vacpage->offsets_free * sizeof(OffsetNumber));
2815         newvacpage->blkno = vacpage->blkno;
2816         newvacpage->free = vacpage->free;
2817         newvacpage->offsets_used = vacpage->offsets_used;
2818         newvacpage->offsets_free = vacpage->offsets_free;
2819
2820         return newvacpage;
2821 }
2822
2823 /*
2824  * Add a VacPage pointer to a VacPageList.
2825  *
2826  *              As a side effect of the way that scan_heap works,
2827  *              higher pages come after lower pages in the array
2828  *              (and highest tid on a page is last).
2829  */
2830 static void
2831 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2832 {
2833 #define PG_NPAGEDESC 1024
2834
2835         /* allocate a VacPage entry if needed */
2836         if (vacpagelist->num_pages == 0)
2837         {
2838                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2839                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2840         }
2841         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2842         {
2843                 vacpagelist->num_allocated_pages *= 2;
2844                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2845         }
2846         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2847         (vacpagelist->num_pages)++;
2848 }
2849
2850 /*
2851  * vac_bsearch: just like standard C library routine bsearch(),
2852  * except that we first test to see whether the target key is outside
2853  * the range of the table entries.      This case is handled relatively slowly
2854  * by the normal binary search algorithm (ie, no faster than any other key)
2855  * but it occurs often enough in VACUUM to be worth optimizing.
2856  */
2857 static void *
2858 vac_bsearch(const void *key, const void *base,
2859                         size_t nelem, size_t size,
2860                         int (*compar) (const void *, const void *))
2861 {
2862         int                     res;
2863         const void *last;
2864
2865         if (nelem == 0)
2866                 return NULL;
2867         res = compar(key, base);
2868         if (res < 0)
2869                 return NULL;
2870         if (res == 0)
2871                 return (void *) base;
2872         if (nelem > 1)
2873         {
2874                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2875                 res = compar(key, last);
2876                 if (res > 0)
2877                         return NULL;
2878                 if (res == 0)
2879                         return (void *) last;
2880         }
2881         if (nelem <= 2)
2882                 return NULL;                    /* already checked 'em all */
2883         return bsearch(key, base, nelem, size, compar);
2884 }
2885
2886 /*
2887  * Comparator routines for use with qsort() and bsearch().
2888  */
2889 static int
2890 vac_cmp_blk(const void *left, const void *right)
2891 {
2892         BlockNumber lblk,
2893                                 rblk;
2894
2895         lblk = (*((VacPage *) left))->blkno;
2896         rblk = (*((VacPage *) right))->blkno;
2897
2898         if (lblk < rblk)
2899                 return -1;
2900         if (lblk == rblk)
2901                 return 0;
2902         return 1;
2903 }
2904
2905 static int
2906 vac_cmp_offno(const void *left, const void *right)
2907 {
2908         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2909                 return -1;
2910         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2911                 return 0;
2912         return 1;
2913 }
2914
2915 static int
2916 vac_cmp_vtlinks(const void *left, const void *right)
2917 {
2918         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2919                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2920                 return -1;
2921         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2922                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2923                 return 1;
2924         /* bi_hi-es are equal */
2925         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2926                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2927                 return -1;
2928         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2929                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2930                 return 1;
2931         /* bi_lo-es are equal */
2932         if (((VTupleLink) left)->new_tid.ip_posid <
2933                 ((VTupleLink) right)->new_tid.ip_posid)
2934                 return -1;
2935         if (((VTupleLink) left)->new_tid.ip_posid >
2936                 ((VTupleLink) right)->new_tid.ip_posid)
2937                 return 1;
2938         return 0;
2939 }
2940
2941
2942 void
2943 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2944 {
2945         List       *indexoidlist,
2946                            *indexoidscan;
2947         int                     i;
2948
2949         indexoidlist = RelationGetIndexList(relation);
2950
2951         *nindexes = length(indexoidlist);
2952
2953         if (*nindexes > 0)
2954                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2955         else
2956                 *Irel = NULL;
2957
2958         i = 0;
2959         foreach(indexoidscan, indexoidlist)
2960         {
2961                 Oid                     indexoid = lfirsti(indexoidscan);
2962
2963                 (*Irel)[i] = index_open(indexoid);
2964                 i++;
2965         }
2966
2967         freeList(indexoidlist);
2968 }
2969
2970
2971 void
2972 vac_close_indexes(int nindexes, Relation *Irel)
2973 {
2974         if (Irel == (Relation *) NULL)
2975                 return;
2976
2977         while (nindexes--)
2978                 index_close(Irel[nindexes]);
2979         pfree(Irel);
2980 }
2981
2982
2983 /*
2984  * Is an index partial (ie, could it contain fewer tuples than the heap?)
2985  */
2986 bool
2987 vac_is_partial_index(Relation indrel)
2988 {
2989         /*
2990          * If the index's AM doesn't support nulls, it's partial for our
2991          * purposes
2992          */
2993         if (!indrel->rd_am->amindexnulls)
2994                 return true;
2995
2996         /* Otherwise, look to see if there's a partial-index predicate */
2997         return (VARSIZE(&indrel->rd_index->indpred) > VARHDRSZ);
2998 }
2999
3000
3001 static bool
3002 enough_space(VacPage vacpage, Size len)
3003 {
3004         len = MAXALIGN(len);
3005
3006         if (len > vacpage->free)
3007                 return false;
3008
3009         /* if there are free itemid(s) and len <= free_space... */
3010         if (vacpage->offsets_used < vacpage->offsets_free)
3011                 return true;
3012
3013         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3014         if (len + sizeof(ItemIdData) <= vacpage->free)
3015                 return true;
3016
3017         return false;
3018 }
3019
3020
3021 /*
3022  * Initialize usage snapshot.
3023  */
3024 void
3025 vac_init_rusage(VacRUsage *ru0)
3026 {
3027         struct timezone tz;
3028
3029         getrusage(RUSAGE_SELF, &ru0->ru);
3030         gettimeofday(&ru0->tv, &tz);
3031 }
3032
3033 /*
3034  * Compute elapsed time since ru0 usage snapshot, and format into
3035  * a displayable string.  Result is in a static string, which is
3036  * tacky, but no one ever claimed that the Postgres backend is
3037  * threadable...
3038  */
3039 const char *
3040 vac_show_rusage(VacRUsage *ru0)
3041 {
3042         static char result[100];
3043         VacRUsage       ru1;
3044
3045         vac_init_rusage(&ru1);
3046
3047         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
3048         {
3049                 ru1.tv.tv_sec--;
3050                 ru1.tv.tv_usec += 1000000;
3051         }
3052         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
3053         {
3054                 ru1.ru.ru_stime.tv_sec--;
3055                 ru1.ru.ru_stime.tv_usec += 1000000;
3056         }
3057         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
3058         {
3059                 ru1.ru.ru_utime.tv_sec--;
3060                 ru1.ru.ru_utime.tv_usec += 1000000;
3061         }
3062
3063         snprintf(result, sizeof(result),
3064                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
3065                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
3066           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
3067                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
3068           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
3069                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
3070                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
3071
3072         return result;
3073 }