]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Update copyrights to 2003.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.259 2003/08/04 02:39:58 momjian Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_index.h"
33 #include "commands/vacuum.h"
34 #include "executor/executor.h"
35 #include "miscadmin.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "storage/smgr.h"
39 #include "tcop/pquery.h"
40 #include "utils/acl.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/inval.h"
44 #include "utils/lsyscache.h"
45 #include "utils/relcache.h"
46 #include "utils/syscache.h"
47 #include "pgstat.h"
48
49
50 typedef struct VacPageData
51 {
52         BlockNumber blkno;                      /* BlockNumber of this Page */
53         Size            free;                   /* FreeSpace on this Page */
54         uint16          offsets_used;   /* Number of OffNums used by vacuum */
55         uint16          offsets_free;   /* Number of OffNums free or to be free */
56         OffsetNumber offsets[1];        /* Array of free OffNums */
57 } VacPageData;
58
59 typedef VacPageData *VacPage;
60
61 typedef struct VacPageListData
62 {
63         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
64         int                     num_pages;              /* Number of pages in pagedesc */
65         int                     num_allocated_pages;    /* Number of allocated pages in
66                                                                                  * pagedesc */
67         VacPage    *pagedesc;           /* Descriptions of pages */
68 } VacPageListData;
69
70 typedef VacPageListData *VacPageList;
71
72 typedef struct VTupleLinkData
73 {
74         ItemPointerData new_tid;
75         ItemPointerData this_tid;
76 } VTupleLinkData;
77
78 typedef VTupleLinkData *VTupleLink;
79
80 typedef struct VTupleMoveData
81 {
82         ItemPointerData tid;            /* tuple ID */
83         VacPage         vacpage;                /* where to move */
84         bool            cleanVpd;               /* clean vacpage before using */
85 } VTupleMoveData;
86
87 typedef VTupleMoveData *VTupleMove;
88
89 typedef struct VRelStats
90 {
91         BlockNumber rel_pages;
92         double          rel_tuples;
93         Size            min_tlen;
94         Size            max_tlen;
95         bool            hasindex;
96         int                     num_vtlinks;
97         VTupleLink      vtlinks;
98 } VRelStats;
99
100
101 static MemoryContext vac_context = NULL;
102
103 static int      elevel = -1;
104
105 static TransactionId OldestXmin;
106 static TransactionId FreezeLimit;
107
108
109 /* non-export function prototypes */
110 static List *getrels(const RangeVar *vacrel, const char *stmttype);
111 static void vac_update_dbstats(Oid dbid,
112                                    TransactionId vacuumXID,
113                                    TransactionId frozenXID);
114 static void vac_truncate_clog(TransactionId vacuumXID,
115                                   TransactionId frozenXID);
116 static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
117 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
118 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
119                   VacPageList vacuum_pages, VacPageList fraged_pages);
120 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
121                         VacPageList vacuum_pages, VacPageList fraged_pages,
122                         int nindexes, Relation *Irel);
123 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
124                         VacPageList vacpagelist);
125 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
126 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
127                          double num_tuples, int keep_tuples);
128 static void scan_index(Relation indrel, double num_tuples);
129 static bool tid_reaped(ItemPointer itemptr, void *state);
130 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
131 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
132                            BlockNumber rel_pages);
133 static VacPage copy_vac_page(VacPage vacpage);
134 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
135 static void *vac_bsearch(const void *key, const void *base,
136                         size_t nelem, size_t size,
137                         int (*compar) (const void *, const void *));
138 static int      vac_cmp_blk(const void *left, const void *right);
139 static int      vac_cmp_offno(const void *left, const void *right);
140 static int      vac_cmp_vtlinks(const void *left, const void *right);
141 static bool enough_space(VacPage vacpage, Size len);
142
143
144 /****************************************************************************
145  *                                                                                                                                                      *
146  *                      Code common to all flavors of VACUUM and ANALYZE                                *
147  *                                                                                                                                                      *
148  ****************************************************************************
149  */
150
151
152 /*
153  * Primary entry point for VACUUM and ANALYZE commands.
154  */
155 void
156 vacuum(VacuumStmt *vacstmt)
157 {
158         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
159         MemoryContext anl_context = NULL;
160         TransactionId initialOldestXmin = InvalidTransactionId;
161         TransactionId initialFreezeLimit = InvalidTransactionId;
162         bool            all_rels;
163         List       *vrl,
164                            *cur;
165
166         if (vacstmt->verbose)
167                 elevel = INFO;
168         else
169                 elevel = DEBUG2;
170
171         /*
172          * We cannot run VACUUM inside a user transaction block; if we were
173          * inside a transaction, then our commit- and
174          * start-transaction-command calls would not have the intended effect!
175          * Furthermore, the forced commit that occurs before truncating the
176          * relation's file would have the effect of committing the rest of the
177          * user's transaction too, which would certainly not be the desired
178          * behavior.
179          */
180         if (vacstmt->vacuum)
181                 PreventTransactionChain((void *) vacstmt, stmttype);
182
183         /*
184          * Send info about dead objects to the statistics collector
185          */
186         if (vacstmt->vacuum)
187                 pgstat_vacuum_tabstat();
188
189         /*
190          * Create special memory context for cross-transaction storage.
191          *
192          * Since it is a child of PortalContext, it will go away eventually even
193          * if we suffer an error; there's no need for special abort cleanup
194          * logic.
195          */
196         vac_context = AllocSetContextCreate(PortalContext,
197                                                                                 "Vacuum",
198                                                                                 ALLOCSET_DEFAULT_MINSIZE,
199                                                                                 ALLOCSET_DEFAULT_INITSIZE,
200                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
201
202         /*
203          * If we are running only ANALYZE, we don't need per-table
204          * transactions, but we still need a memory context with table
205          * lifetime.
206          */
207         if (vacstmt->analyze && !vacstmt->vacuum)
208                 anl_context = AllocSetContextCreate(PortalContext,
209                                                                                         "Analyze",
210                                                                                         ALLOCSET_DEFAULT_MINSIZE,
211                                                                                         ALLOCSET_DEFAULT_INITSIZE,
212                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
213
214         /* Assume we are processing everything unless one table is mentioned */
215         all_rels = (vacstmt->relation == NULL);
216
217         /* Build list of relations to process (note this lives in vac_context) */
218         vrl = getrels(vacstmt->relation, stmttype);
219
220         /*
221          * Formerly, there was code here to prevent more than one VACUUM from
222          * executing concurrently in the same database.  However, there's no
223          * good reason to prevent that, and manually removing lockfiles after
224          * a vacuum crash was a pain for dbadmins.      So, forget about
225          * lockfiles, and just rely on the locks we grab on each target table
226          * to ensure that there aren't two VACUUMs running on the same table
227          * at the same time.
228          */
229
230         /*
231          * The strangeness with committing and starting transactions here is
232          * due to wanting to run each table's VACUUM as a separate
233          * transaction, so that we don't hold locks unnecessarily long.  Also,
234          * if we are doing VACUUM ANALYZE, the ANALYZE part runs as a separate
235          * transaction from the VACUUM to further reduce locking.
236          *
237          * vacuum_rel expects to be entered with no transaction active; it will
238          * start and commit its own transaction.  But we are called by an SQL
239          * command, and so we are executing inside a transaction already.  We
240          * commit the transaction started in PostgresMain() here, and start
241          * another one before exiting to match the commit waiting for us back
242          * in PostgresMain().
243          *
244          * In the case of an ANALYZE statement (no vacuum, just analyze) it's
245          * okay to run the whole thing in the outer transaction, and so we
246          * skip transaction start/stop operations.
247          */
248         if (vacstmt->vacuum)
249         {
250                 if (all_rels)
251                 {
252                         /*
253                          * It's a database-wide VACUUM.
254                          *
255                          * Compute the initially applicable OldestXmin and FreezeLimit
256                          * XIDs, so that we can record these values at the end of the
257                          * VACUUM. Note that individual tables may well be processed
258                          * with newer values, but we can guarantee that no
259                          * (non-shared) relations are processed with older ones.
260                          *
261                          * It is okay to record non-shared values in pg_database, even
262                          * though we may vacuum shared relations with older cutoffs,
263                          * because only the minimum of the values present in
264                          * pg_database matters.  We can be sure that shared relations
265                          * have at some time been vacuumed with cutoffs no worse than
266                          * the global minimum; for, if there is a backend in some
267                          * other DB with xmin = OLDXMIN that's determining the cutoff
268                          * with which we vacuum shared relations, it is not possible
269                          * for that database to have a cutoff newer than OLDXMIN
270                          * recorded in pg_database.
271                          */
272                         vacuum_set_xid_limits(vacstmt, false,
273                                                                   &initialOldestXmin,
274                                                                   &initialFreezeLimit);
275                 }
276
277                 /* matches the StartTransaction in PostgresMain() */
278                 CommitTransactionCommand();
279         }
280
281         /*
282          * Loop to process each selected relation.
283          */
284         foreach(cur, vrl)
285         {
286                 Oid                     relid = lfirsto(cur);
287
288                 if (vacstmt->vacuum)
289                 {
290                         if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
291                                 all_rels = false;               /* forget about updating dbstats */
292                 }
293                 if (vacstmt->analyze)
294                 {
295                         MemoryContext old_context = NULL;
296
297                         /*
298                          * If we vacuumed, use new transaction for analyze. Otherwise,
299                          * we can use the outer transaction, but we still need to call
300                          * analyze_rel in a memory context that will be cleaned up on
301                          * return (else we leak memory while processing multiple
302                          * tables).
303                          */
304                         if (vacstmt->vacuum)
305                         {
306                                 StartTransactionCommand();
307                                 SetQuerySnapshot();             /* might be needed for functions
308                                                                                  * in indexes */
309                         }
310                         else
311                                 old_context = MemoryContextSwitchTo(anl_context);
312
313                         analyze_rel(relid, vacstmt);
314
315                         if (vacstmt->vacuum)
316                                 CommitTransactionCommand();
317                         else
318                         {
319                                 MemoryContextSwitchTo(old_context);
320                                 MemoryContextResetAndDeleteChildren(anl_context);
321                         }
322                 }
323         }
324
325         /*
326          * Finish up processing.
327          */
328         if (vacstmt->vacuum)
329         {
330                 /* here, we are not in a transaction */
331
332                 /*
333                  * This matches the CommitTransaction waiting for us in
334                  * PostgresMain().
335                  */
336                 StartTransactionCommand();
337
338                 /*
339                  * If it was a database-wide VACUUM, print FSM usage statistics
340                  * (we don't make you be superuser to see these).
341                  */
342                 if (vacstmt->relation == NULL)
343                         PrintFreeSpaceMapStatistics(elevel);
344
345                 /*
346                  * If we completed a database-wide VACUUM without skipping any
347                  * relations, update the database's pg_database row with info
348                  * about the transaction IDs used, and try to truncate pg_clog.
349                  */
350                 if (all_rels)
351                 {
352                         vac_update_dbstats(MyDatabaseId,
353                                                            initialOldestXmin, initialFreezeLimit);
354                         vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
355                 }
356         }
357
358         /*
359          * Clean up working storage --- note we must do this after
360          * StartTransactionCommand, else we might be trying to delete the
361          * active context!
362          */
363         MemoryContextDelete(vac_context);
364         vac_context = NULL;
365
366         if (anl_context)
367                 MemoryContextDelete(anl_context);
368 }
369
370 /*
371  * Build a list of Oids for each relation to be processed
372  *
373  * The list is built in vac_context so that it will survive across our
374  * per-relation transactions.
375  */
376 static List *
377 getrels(const RangeVar *vacrel, const char *stmttype)
378 {
379         List       *vrl = NIL;
380         MemoryContext oldcontext;
381
382         if (vacrel)
383         {
384                 /* Process specific relation */
385                 Oid                     relid;
386
387                 relid = RangeVarGetRelid(vacrel, false);
388
389                 /* Make a relation list entry for this guy */
390                 oldcontext = MemoryContextSwitchTo(vac_context);
391                 vrl = lappendo(vrl, relid);
392                 MemoryContextSwitchTo(oldcontext);
393         }
394         else
395         {
396                 /* Process all plain relations listed in pg_class */
397                 Relation        pgclass;
398                 HeapScanDesc scan;
399                 HeapTuple       tuple;
400                 ScanKeyData key;
401
402                 ScanKeyEntryInitialize(&key, 0x0,
403                                                            Anum_pg_class_relkind,
404                                                            F_CHAREQ,
405                                                            CharGetDatum(RELKIND_RELATION));
406
407                 pgclass = heap_openr(RelationRelationName, AccessShareLock);
408
409                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
410
411                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
412                 {
413                         /* Make a relation list entry for this guy */
414                         oldcontext = MemoryContextSwitchTo(vac_context);
415                         vrl = lappendo(vrl, HeapTupleGetOid(tuple));
416                         MemoryContextSwitchTo(oldcontext);
417                 }
418
419                 heap_endscan(scan);
420                 heap_close(pgclass, AccessShareLock);
421         }
422
423         return vrl;
424 }
425
426 /*
427  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
428  */
429 void
430 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
431                                           TransactionId *oldestXmin,
432                                           TransactionId *freezeLimit)
433 {
434         TransactionId limit;
435
436         *oldestXmin = GetOldestXmin(sharedRel);
437
438         Assert(TransactionIdIsNormal(*oldestXmin));
439
440         if (vacstmt->freeze)
441         {
442                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
443                 limit = *oldestXmin;
444         }
445         else
446         {
447                 /*
448                  * Normal case: freeze cutoff is well in the past, to wit, about
449                  * halfway to the wrap horizon
450                  */
451                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
452         }
453
454         /*
455          * Be careful not to generate a "permanent" XID
456          */
457         if (!TransactionIdIsNormal(limit))
458                 limit = FirstNormalTransactionId;
459
460         /*
461          * Ensure sane relationship of limits
462          */
463         if (TransactionIdFollows(limit, *oldestXmin))
464         {
465                 ereport(WARNING,
466                                 (errmsg("oldest Xmin is far in the past"),
467                                  errhint("Close open transactions soon to avoid wraparound problems.")));
468                 limit = *oldestXmin;
469         }
470
471         *freezeLimit = limit;
472 }
473
474
475 /*
476  *      vac_update_relstats() -- update statistics for one relation
477  *
478  *              Update the whole-relation statistics that are kept in its pg_class
479  *              row.  There are additional stats that will be updated if we are
480  *              doing ANALYZE, but we always update these stats.  This routine works
481  *              for both index and heap relation entries in pg_class.
482  *
483  *              We violate no-overwrite semantics here by storing new values for the
484  *              statistics columns directly into the pg_class tuple that's already on
485  *              the page.  The reason for this is that if we updated these tuples in
486  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
487  *              by the time we got done with a vacuum cycle, most of the tuples in
488  *              pg_class would've been obsoleted.  Of course, this only works for
489  *              fixed-size never-null columns, but these are.
490  *
491  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
492  *              ANALYZE.
493  */
494 void
495 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
496                                         bool hasindex)
497 {
498         Relation        rd;
499         HeapTupleData rtup;
500         HeapTuple       ctup;
501         Form_pg_class pgcform;
502         Buffer          buffer;
503
504         /*
505          * update number of tuples and number of pages in pg_class
506          */
507         rd = heap_openr(RelationRelationName, RowExclusiveLock);
508
509         ctup = SearchSysCache(RELOID,
510                                                   ObjectIdGetDatum(relid),
511                                                   0, 0, 0);
512         if (!HeapTupleIsValid(ctup))
513                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
514                          relid);
515
516         /* get the buffer cache tuple */
517         rtup.t_self = ctup->t_self;
518         ReleaseSysCache(ctup);
519         if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
520                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
521                          relid);
522
523         /* overwrite the existing statistics in the tuple */
524         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
525         pgcform->relpages = (int32) num_pages;
526         pgcform->reltuples = num_tuples;
527         pgcform->relhasindex = hasindex;
528
529         /*
530          * If we have discovered that there are no indexes, then there's no
531          * primary key either.  This could be done more thoroughly...
532          */
533         if (!hasindex)
534                 pgcform->relhaspkey = false;
535
536         /*
537          * Invalidate the tuple in the catcaches; this also arranges to flush
538          * the relation's relcache entry.  (If we fail to commit for some
539          * reason, no flush will occur, but no great harm is done since there
540          * are no noncritical state updates here.)
541          */
542         CacheInvalidateHeapTuple(rd, &rtup);
543
544         /* Write the buffer */
545         WriteBuffer(buffer);
546
547         heap_close(rd, RowExclusiveLock);
548 }
549
550
551 /*
552  *      vac_update_dbstats() -- update statistics for one database
553  *
554  *              Update the whole-database statistics that are kept in its pg_database
555  *              row.
556  *
557  *              We violate no-overwrite semantics here by storing new values for the
558  *              statistics columns directly into the tuple that's already on the page.
559  *              As with vac_update_relstats, this avoids leaving dead tuples behind
560  *              after a VACUUM; which is good since GetRawDatabaseInfo
561  *              can get confused by finding dead tuples in pg_database.
562  *
563  *              This routine is shared by full and lazy VACUUM.  Note that it is only
564  *              applied after a database-wide VACUUM operation.
565  */
566 static void
567 vac_update_dbstats(Oid dbid,
568                                    TransactionId vacuumXID,
569                                    TransactionId frozenXID)
570 {
571         Relation        relation;
572         ScanKeyData entry[1];
573         HeapScanDesc scan;
574         HeapTuple       tuple;
575         Form_pg_database dbform;
576
577         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
578
579         /* Must use a heap scan, since there's no syscache for pg_database */
580         ScanKeyEntryInitialize(&entry[0], 0x0,
581                                                    ObjectIdAttributeNumber, F_OIDEQ,
582                                                    ObjectIdGetDatum(dbid));
583
584         scan = heap_beginscan(relation, SnapshotNow, 1, entry);
585
586         tuple = heap_getnext(scan, ForwardScanDirection);
587
588         if (!HeapTupleIsValid(tuple))
589                 elog(ERROR, "could not find tuple for database %u", dbid);
590
591         dbform = (Form_pg_database) GETSTRUCT(tuple);
592
593         /* overwrite the existing statistics in the tuple */
594         dbform->datvacuumxid = vacuumXID;
595         dbform->datfrozenxid = frozenXID;
596
597         /* invalidate the tuple in the cache and write the buffer */
598         CacheInvalidateHeapTuple(relation, tuple);
599         WriteNoReleaseBuffer(scan->rs_cbuf);
600
601         heap_endscan(scan);
602
603         heap_close(relation, RowExclusiveLock);
604 }
605
606
607 /*
608  *      vac_truncate_clog() -- attempt to truncate the commit log
609  *
610  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
611  *              and use it to truncate the transaction commit log (pg_clog).
612  *              Also generate a warning if the system-wide oldest datfrozenxid
613  *              seems to be in danger of wrapping around.
614  *
615  *              The passed XIDs are simply the ones I just wrote into my pg_database
616  *              entry.  They're used to initialize the "min" calculations.
617  *
618  *              This routine is shared by full and lazy VACUUM.  Note that it is only
619  *              applied after a database-wide VACUUM operation.
620  */
621 static void
622 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
623 {
624         TransactionId myXID;
625         Relation        relation;
626         HeapScanDesc scan;
627         HeapTuple       tuple;
628         int32           age;
629         bool            vacuumAlreadyWrapped = false;
630         bool            frozenAlreadyWrapped = false;
631
632         myXID = GetCurrentTransactionId();
633
634         relation = heap_openr(DatabaseRelationName, AccessShareLock);
635
636         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
637
638         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
639         {
640                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
641
642                 /* Ignore non-connectable databases (eg, template0) */
643                 /* It's assumed that these have been frozen correctly */
644                 if (!dbform->datallowconn)
645                         continue;
646
647                 if (TransactionIdIsNormal(dbform->datvacuumxid))
648                 {
649                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
650                                 vacuumAlreadyWrapped = true;
651                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
652                                 vacuumXID = dbform->datvacuumxid;
653                 }
654                 if (TransactionIdIsNormal(dbform->datfrozenxid))
655                 {
656                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
657                                 frozenAlreadyWrapped = true;
658                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
659                                 frozenXID = dbform->datfrozenxid;
660                 }
661         }
662
663         heap_endscan(scan);
664
665         heap_close(relation, AccessShareLock);
666
667         /*
668          * Do not truncate CLOG if we seem to have suffered wraparound
669          * already; the computed minimum XID might be bogus.
670          */
671         if (vacuumAlreadyWrapped)
672         {
673                 ereport(WARNING,
674                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
675                                  errdetail("You may have already suffered transaction-wraparound data loss.")));
676                 return;
677         }
678
679         /* Truncate CLOG to the oldest vacuumxid */
680         TruncateCLOG(vacuumXID);
681
682         /* Give warning about impending wraparound problems */
683         if (frozenAlreadyWrapped)
684         {
685                 ereport(WARNING,
686                                 (errmsg("some databases have not been vacuumed in over 1 billion transactions"),
687                                  errhint("Better vacuum them soon, or you may have a wraparound failure.")));
688         }
689         else
690         {
691                 age = (int32) (myXID - frozenXID);
692                 if (age > (int32) ((MaxTransactionId >> 3) * 3))
693                         ereport(WARNING,
694                                         (errmsg("some databases have not been vacuumed in %d transactions",
695                                                         age),
696                                          errhint("Better vacuum them within %d transactions, "
697                                                          "or you may have a wraparound failure.",
698                                                          (int32) (MaxTransactionId >> 1) - age)));
699         }
700 }
701
702
703 /****************************************************************************
704  *                                                                                                                                                      *
705  *                      Code common to both flavors of VACUUM                                                   *
706  *                                                                                                                                                      *
707  ****************************************************************************
708  */
709
710
711 /*
712  *      vacuum_rel() -- vacuum one heap relation
713  *
714  *              Returns TRUE if we actually processed the relation (or can ignore it
715  *              for some reason), FALSE if we failed to process it due to permissions
716  *              or other reasons.  (A FALSE result really means that some data
717  *              may have been left unvacuumed, so we can't update XID stats.)
718  *
719  *              Doing one heap at a time incurs extra overhead, since we need to
720  *              check that the heap exists again just before we vacuum it.      The
721  *              reason that we do this is so that vacuuming can be spread across
722  *              many small transactions.  Otherwise, two-phase locking would require
723  *              us to lock the entire database during one pass of the vacuum cleaner.
724  *
725  *              At entry and exit, we are not inside a transaction.
726  */
727 static bool
728 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
729 {
730         LOCKMODE        lmode;
731         Relation        onerel;
732         LockRelId       onerelid;
733         Oid                     toast_relid;
734         bool            result;
735
736         /* Begin a transaction for vacuuming this relation */
737         StartTransactionCommand();
738         SetQuerySnapshot();                     /* might be needed for functions in
739                                                                  * indexes */
740
741         /*
742          * Check for user-requested abort.      Note we want this to be inside a
743          * transaction, so xact.c doesn't issue useless WARNING.
744          */
745         CHECK_FOR_INTERRUPTS();
746
747         /*
748          * Race condition -- if the pg_class tuple has gone away since the
749          * last time we saw it, we don't need to vacuum it.
750          */
751         if (!SearchSysCacheExists(RELOID,
752                                                           ObjectIdGetDatum(relid),
753                                                           0, 0, 0))
754         {
755                 CommitTransactionCommand();
756                 return true;                    /* okay 'cause no data there */
757         }
758
759         /*
760          * Determine the type of lock we want --- hard exclusive lock for a
761          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
762          * vacuum.      Either way, we can be sure that no other backend is
763          * vacuuming the same table.
764          */
765         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
766
767         /*
768          * Open the class, get an appropriate lock on it, and check
769          * permissions.
770          *
771          * We allow the user to vacuum a table if he is superuser, the table
772          * owner, or the database owner (but in the latter case, only if it's
773          * not a shared relation).      pg_class_ownercheck includes the superuser
774          * case.
775          *
776          * Note we choose to treat permissions failure as a WARNING and keep
777          * trying to vacuum the rest of the DB --- is this appropriate?
778          */
779         onerel = relation_open(relid, lmode);
780
781         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
782                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
783         {
784                 ereport(WARNING,
785                                 (errmsg("skipping \"%s\" --- only table or database owner can VACUUM it",
786                                                 RelationGetRelationName(onerel))));
787                 relation_close(onerel, lmode);
788                 CommitTransactionCommand();
789                 return false;
790         }
791
792         /*
793          * Check that it's a plain table; we used to do this in getrels() but
794          * seems safer to check after we've locked the relation.
795          */
796         if (onerel->rd_rel->relkind != expected_relkind)
797         {
798                 ereport(WARNING,
799                                 (errmsg("skipping \"%s\" --- cannot VACUUM indexes, views or special system tables",
800                                                 RelationGetRelationName(onerel))));
801                 relation_close(onerel, lmode);
802                 CommitTransactionCommand();
803                 return false;
804         }
805
806         /*
807          * Silently ignore tables that are temp tables of other backends ---
808          * trying to vacuum these will lead to great unhappiness, since their
809          * contents are probably not up-to-date on disk.  (We don't throw a
810          * warning here; it would just lead to chatter during a database-wide
811          * VACUUM.)
812          */
813         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
814         {
815                 relation_close(onerel, lmode);
816                 CommitTransactionCommand();
817                 return true;                    /* assume no long-lived data in temp
818                                                                  * tables */
819         }
820
821         /*
822          * Get a session-level lock too. This will protect our access to the
823          * relation across multiple transactions, so that we can vacuum the
824          * relation's TOAST table (if any) secure in the knowledge that no one
825          * is deleting the parent relation.
826          *
827          * NOTE: this cannot block, even if someone else is waiting for access,
828          * because the lock manager knows that both lock requests are from the
829          * same process.
830          */
831         onerelid = onerel->rd_lockInfo.lockRelId;
832         LockRelationForSession(&onerelid, lmode);
833
834         /*
835          * Remember the relation's TOAST relation for later
836          */
837         toast_relid = onerel->rd_rel->reltoastrelid;
838
839         /*
840          * Do the actual work --- either FULL or "lazy" vacuum
841          */
842         if (vacstmt->full)
843                 full_vacuum_rel(onerel, vacstmt);
844         else
845                 lazy_vacuum_rel(onerel, vacstmt);
846
847         result = true;                          /* did the vacuum */
848
849         /* all done with this class, but hold lock until commit */
850         relation_close(onerel, NoLock);
851
852         /*
853          * Complete the transaction and free all temporary memory used.
854          */
855         CommitTransactionCommand();
856
857         /*
858          * If the relation has a secondary toast rel, vacuum that too while we
859          * still hold the session lock on the master table.  Note however that
860          * "analyze" will not get done on the toast table.      This is good,
861          * because the toaster always uses hardcoded index access and
862          * statistics are totally unimportant for toast relations.
863          */
864         if (toast_relid != InvalidOid)
865         {
866                 if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
867                         result = false;         /* failed to vacuum the TOAST table? */
868         }
869
870         /*
871          * Now release the session-level lock on the master table.
872          */
873         UnlockRelationForSession(&onerelid, lmode);
874
875         return result;
876 }
877
878
879 /****************************************************************************
880  *                                                                                                                                                      *
881  *                      Code for VACUUM FULL (only)                                                                             *
882  *                                                                                                                                                      *
883  ****************************************************************************
884  */
885
886
887 /*
888  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
889  *
890  *              This routine vacuums a single heap, cleans out its indexes, and
891  *              updates its num_pages and num_tuples statistics.
892  *
893  *              At entry, we have already established a transaction and opened
894  *              and locked the relation.
895  */
896 static void
897 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
898 {
899         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
900                                                                                  * clean indexes */
901         VacPageListData fraged_pages;           /* List of pages with space enough
902                                                                                  * for re-using */
903         Relation   *Irel;
904         int                     nindexes,
905                                 i;
906         VRelStats  *vacrelstats;
907         bool            reindex = false;
908
909         if (IsIgnoringSystemIndexes() &&
910                 IsSystemRelation(onerel))
911                 reindex = true;
912
913         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
914                                                   &OldestXmin, &FreezeLimit);
915
916         /*
917          * Set up statistics-gathering machinery.
918          */
919         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
920         vacrelstats->rel_pages = 0;
921         vacrelstats->rel_tuples = 0;
922         vacrelstats->hasindex = false;
923
924         /* scan the heap */
925         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
926         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
927
928         /* Now open all indexes of the relation */
929         vac_open_indexes(onerel, &nindexes, &Irel);
930         if (!Irel)
931                 reindex = false;
932         else if (!RelationGetForm(onerel)->relhasindex)
933                 reindex = true;
934         if (nindexes > 0)
935                 vacrelstats->hasindex = true;
936
937 #ifdef NOT_USED
938
939         /*
940          * reindex in VACUUM is dangerous under WAL. ifdef out until it
941          * becomes safe.
942          */
943         if (reindex)
944         {
945                 vac_close_indexes(nindexes, Irel);
946                 Irel = (Relation *) NULL;
947                 activate_indexes_of_a_table(onerel, false);
948         }
949 #endif   /* NOT_USED */
950
951         /* Clean/scan index relation(s) */
952         if (Irel != (Relation *) NULL)
953         {
954                 if (vacuum_pages.num_pages > 0)
955                 {
956                         for (i = 0; i < nindexes; i++)
957                                 vacuum_index(&vacuum_pages, Irel[i],
958                                                          vacrelstats->rel_tuples, 0);
959                 }
960                 else
961                 {
962                         /* just scan indexes to update statistic */
963                         for (i = 0; i < nindexes; i++)
964                                 scan_index(Irel[i], vacrelstats->rel_tuples);
965                 }
966         }
967
968         if (fraged_pages.num_pages > 0)
969         {
970                 /* Try to shrink heap */
971                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
972                                         nindexes, Irel);
973                 vac_close_indexes(nindexes, Irel);
974         }
975         else
976         {
977                 vac_close_indexes(nindexes, Irel);
978                 if (vacuum_pages.num_pages > 0)
979                 {
980                         /* Clean pages from vacuum_pages list */
981                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
982                 }
983                 else
984                 {
985                         /*
986                          * Flush dirty pages out to disk.  We must do this even if we
987                          * didn't do anything else, because we want to ensure that all
988                          * tuples have correct on-row commit status on disk (see
989                          * bufmgr.c's comments for FlushRelationBuffers()).
990                          */
991                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
992                         if (i < 0)
993                                 elog(ERROR, "FlushRelationBuffers returned %d", i);
994                 }
995         }
996
997 #ifdef NOT_USED
998         if (reindex)
999                 activate_indexes_of_a_table(onerel, true);
1000 #endif   /* NOT_USED */
1001
1002         /* update shared free space map with final free space info */
1003         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1004
1005         /* update statistics in pg_class */
1006         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
1007                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
1008 }
1009
1010
1011 /*
1012  *      scan_heap() -- scan an open heap relation
1013  *
1014  *              This routine sets commit status bits, constructs vacuum_pages (list
1015  *              of pages we need to compact free space on and/or clean indexes of
1016  *              deleted tuples), constructs fraged_pages (list of pages with free
1017  *              space that tuples could be moved into), and calculates statistics
1018  *              on the number of live tuples in the heap.
1019  */
1020 static void
1021 scan_heap(VRelStats *vacrelstats, Relation onerel,
1022                   VacPageList vacuum_pages, VacPageList fraged_pages)
1023 {
1024         BlockNumber nblocks,
1025                                 blkno;
1026         ItemId          itemid;
1027         Buffer          buf;
1028         HeapTupleData tuple;
1029         OffsetNumber offnum,
1030                                 maxoff;
1031         bool            pgchanged,
1032                                 tupgone,
1033                                 notup;
1034         char       *relname;
1035         VacPage         vacpage,
1036                                 vacpagecopy;
1037         BlockNumber empty_pages,
1038                                 empty_end_pages;
1039         double          num_tuples,
1040                                 tups_vacuumed,
1041                                 nkeep,
1042                                 nunused;
1043         double          free_space,
1044                                 usable_free_space;
1045         Size            min_tlen = MaxTupleSize;
1046         Size            max_tlen = 0;
1047         int                     i;
1048         bool            do_shrinking = true;
1049         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1050         int                     num_vtlinks = 0;
1051         int                     free_vtlinks = 100;
1052         VacRUsage       ru0;
1053
1054         vac_init_rusage(&ru0);
1055
1056         relname = RelationGetRelationName(onerel);
1057         ereport(elevel,
1058                         (errmsg("vacuuming \"%s.%s\"",
1059                                         get_namespace_name(RelationGetNamespace(onerel)),
1060                                         relname)));
1061
1062         empty_pages = empty_end_pages = 0;
1063         num_tuples = tups_vacuumed = nkeep = nunused = 0;
1064         free_space = 0;
1065
1066         nblocks = RelationGetNumberOfBlocks(onerel);
1067
1068         /*
1069          * We initially create each VacPage item in a maximal-sized workspace,
1070          * then copy the workspace into a just-large-enough copy.
1071          */
1072         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1073
1074         for (blkno = 0; blkno < nblocks; blkno++)
1075         {
1076                 Page            page,
1077                                         tempPage = NULL;
1078                 bool            do_reap,
1079                                         do_frag;
1080
1081                 CHECK_FOR_INTERRUPTS();
1082
1083                 buf = ReadBuffer(onerel, blkno);
1084                 page = BufferGetPage(buf);
1085
1086                 vacpage->blkno = blkno;
1087                 vacpage->offsets_used = 0;
1088                 vacpage->offsets_free = 0;
1089
1090                 if (PageIsNew(page))
1091                 {
1092                         ereport(WARNING,
1093                         (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1094                                         relname, blkno)));
1095                         PageInit(page, BufferGetPageSize(buf), 0);
1096                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1097                         free_space += vacpage->free;
1098                         empty_pages++;
1099                         empty_end_pages++;
1100                         vacpagecopy = copy_vac_page(vacpage);
1101                         vpage_insert(vacuum_pages, vacpagecopy);
1102                         vpage_insert(fraged_pages, vacpagecopy);
1103                         WriteBuffer(buf);
1104                         continue;
1105                 }
1106
1107                 if (PageIsEmpty(page))
1108                 {
1109                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1110                         free_space += vacpage->free;
1111                         empty_pages++;
1112                         empty_end_pages++;
1113                         vacpagecopy = copy_vac_page(vacpage);
1114                         vpage_insert(vacuum_pages, vacpagecopy);
1115                         vpage_insert(fraged_pages, vacpagecopy);
1116                         ReleaseBuffer(buf);
1117                         continue;
1118                 }
1119
1120                 pgchanged = false;
1121                 notup = true;
1122                 maxoff = PageGetMaxOffsetNumber(page);
1123                 for (offnum = FirstOffsetNumber;
1124                          offnum <= maxoff;
1125                          offnum = OffsetNumberNext(offnum))
1126                 {
1127                         uint16          sv_infomask;
1128
1129                         itemid = PageGetItemId(page, offnum);
1130
1131                         /*
1132                          * Collect un-used items too - it's possible to have indexes
1133                          * pointing here after crash.
1134                          */
1135                         if (!ItemIdIsUsed(itemid))
1136                         {
1137                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1138                                 nunused += 1;
1139                                 continue;
1140                         }
1141
1142                         tuple.t_datamcxt = NULL;
1143                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1144                         tuple.t_len = ItemIdGetLength(itemid);
1145                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1146
1147                         tupgone = false;
1148                         sv_infomask = tuple.t_data->t_infomask;
1149
1150                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1151                         {
1152                                 case HEAPTUPLE_DEAD:
1153                                         tupgone = true;         /* we can delete the tuple */
1154                                         break;
1155                                 case HEAPTUPLE_LIVE:
1156
1157                                         /*
1158                                          * Tuple is good.  Consider whether to replace its
1159                                          * xmin value with FrozenTransactionId.
1160                                          */
1161                                         if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
1162                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1163                                                                                           FreezeLimit))
1164                                         {
1165                                                 HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
1166                                                 /* infomask should be okay already */
1167                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1168                                                 pgchanged = true;
1169                                         }
1170                                         break;
1171                                 case HEAPTUPLE_RECENTLY_DEAD:
1172
1173                                         /*
1174                                          * If tuple is recently deleted then we must not
1175                                          * remove it from relation.
1176                                          */
1177                                         nkeep += 1;
1178
1179                                         /*
1180                                          * If we do shrinking and this tuple is updated one
1181                                          * then remember it to construct updated tuple
1182                                          * dependencies.
1183                                          */
1184                                         if (do_shrinking &&
1185                                                 !(ItemPointerEquals(&(tuple.t_self),
1186                                                                                         &(tuple.t_data->t_ctid))))
1187                                         {
1188                                                 if (free_vtlinks == 0)
1189                                                 {
1190                                                         free_vtlinks = 1000;
1191                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1192                                                                                    (free_vtlinks + num_vtlinks) *
1193                                                                                                  sizeof(VTupleLinkData));
1194                                                 }
1195                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1196                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1197                                                 free_vtlinks--;
1198                                                 num_vtlinks++;
1199                                         }
1200                                         break;
1201                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1202
1203                                         /*
1204                                          * This should not happen, since we hold exclusive
1205                                          * lock on the relation; shouldn't we raise an error?
1206                                          * (Actually, it can happen in system catalogs, since
1207                                          * we tend to release write lock before commit there.)
1208                                          */
1209                                         ereport(NOTICE,
1210                                                         (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
1211                                                                         relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
1212                                         do_shrinking = false;
1213                                         break;
1214                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1215
1216                                         /*
1217                                          * This should not happen, since we hold exclusive
1218                                          * lock on the relation; shouldn't we raise an error?
1219                                          * (Actually, it can happen in system catalogs, since
1220                                          * we tend to release write lock before commit there.)
1221                                          */
1222                                         ereport(NOTICE,
1223                                                         (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
1224                                                                         relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
1225                                         do_shrinking = false;
1226                                         break;
1227                                 default:
1228                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1229                                         break;
1230                         }
1231
1232                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1233                         if (sv_infomask != tuple.t_data->t_infomask)
1234                                 pgchanged = true;
1235
1236                         /*
1237                          * Other checks...
1238                          */
1239                         if (onerel->rd_rel->relhasoids &&
1240                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1241                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1242                                          relname, blkno, offnum);
1243
1244                         if (tupgone)
1245                         {
1246                                 ItemId          lpp;
1247
1248                                 /*
1249                                  * Here we are building a temporary copy of the page with
1250                                  * dead tuples removed.  Below we will apply
1251                                  * PageRepairFragmentation to the copy, so that we can
1252                                  * determine how much space will be available after
1253                                  * removal of dead tuples.      But note we are NOT changing
1254                                  * the real page yet...
1255                                  */
1256                                 if (tempPage == (Page) NULL)
1257                                 {
1258                                         Size            pageSize;
1259
1260                                         pageSize = PageGetPageSize(page);
1261                                         tempPage = (Page) palloc(pageSize);
1262                                         memcpy(tempPage, page, pageSize);
1263                                 }
1264
1265                                 /* mark it unused on the temp page */
1266                                 lpp = PageGetItemId(tempPage, offnum);
1267                                 lpp->lp_flags &= ~LP_USED;
1268
1269                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1270                                 tups_vacuumed += 1;
1271                         }
1272                         else
1273                         {
1274                                 num_tuples += 1;
1275                                 notup = false;
1276                                 if (tuple.t_len < min_tlen)
1277                                         min_tlen = tuple.t_len;
1278                                 if (tuple.t_len > max_tlen)
1279                                         max_tlen = tuple.t_len;
1280                         }
1281                 }                                               /* scan along page */
1282
1283                 if (tempPage != (Page) NULL)
1284                 {
1285                         /* Some tuples are removable; figure free space after removal */
1286                         PageRepairFragmentation(tempPage, NULL);
1287                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1288                         pfree(tempPage);
1289                         do_reap = true;
1290                 }
1291                 else
1292                 {
1293                         /* Just use current available space */
1294                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1295                         /* Need to reap the page if it has ~LP_USED line pointers */
1296                         do_reap = (vacpage->offsets_free > 0);
1297                 }
1298
1299                 free_space += vacpage->free;
1300
1301                 /*
1302                  * Add the page to fraged_pages if it has a useful amount of free
1303                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1304                  * don't know that accurately near the start of the relation, so
1305                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1306                  */
1307                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1308
1309                 if (do_reap || do_frag)
1310                 {
1311                         vacpagecopy = copy_vac_page(vacpage);
1312                         if (do_reap)
1313                                 vpage_insert(vacuum_pages, vacpagecopy);
1314                         if (do_frag)
1315                                 vpage_insert(fraged_pages, vacpagecopy);
1316                 }
1317
1318                 /*
1319                  * Include the page in empty_end_pages if it will be empty after
1320                  * vacuuming; this is to keep us from using it as a move
1321                  * destination.
1322                  */
1323                 if (notup)
1324                 {
1325                         empty_pages++;
1326                         empty_end_pages++;
1327                 }
1328                 else
1329                         empty_end_pages = 0;
1330
1331                 if (pgchanged)
1332                         WriteBuffer(buf);
1333                 else
1334                         ReleaseBuffer(buf);
1335         }
1336
1337         pfree(vacpage);
1338
1339         /* save stats in the rel list for use later */
1340         vacrelstats->rel_tuples = num_tuples;
1341         vacrelstats->rel_pages = nblocks;
1342         if (num_tuples == 0)
1343                 min_tlen = max_tlen = 0;
1344         vacrelstats->min_tlen = min_tlen;
1345         vacrelstats->max_tlen = max_tlen;
1346
1347         vacuum_pages->empty_end_pages = empty_end_pages;
1348         fraged_pages->empty_end_pages = empty_end_pages;
1349
1350         /*
1351          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1352          * remove any "empty" end-pages from the list, and compute usable free
1353          * space = free space in remaining pages.
1354          */
1355         if (do_shrinking)
1356         {
1357                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1358                 fraged_pages->num_pages -= empty_end_pages;
1359                 usable_free_space = 0;
1360                 for (i = 0; i < fraged_pages->num_pages; i++)
1361                         usable_free_space += fraged_pages->pagedesc[i]->free;
1362         }
1363         else
1364         {
1365                 fraged_pages->num_pages = 0;
1366                 usable_free_space = 0;
1367         }
1368
1369         /* don't bother to save vtlinks if we will not call repair_frag */
1370         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1371         {
1372                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1373                           vac_cmp_vtlinks);
1374                 vacrelstats->vtlinks = vtlinks;
1375                 vacrelstats->num_vtlinks = num_vtlinks;
1376         }
1377         else
1378         {
1379                 vacrelstats->vtlinks = NULL;
1380                 vacrelstats->num_vtlinks = 0;
1381                 pfree(vtlinks);
1382         }
1383
1384         ereport(elevel,
1385                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable tuples in %u pages",
1386                                         RelationGetRelationName(onerel),
1387                                         tups_vacuumed, num_tuples, nblocks),
1388                          errdetail("%.0f dead tuples cannot be removed yet.\n"
1389                                 "Nonremovable tuples range from %lu to %lu bytes long.\n"
1390                                            "There were %.0f unused item pointers.\n"
1391                  "Total free space (including removable tuples) is %.0f bytes.\n"
1392                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1393                                            "%u pages containing %.0f free bytes are potential move destinations.\n"
1394                                            "%s",
1395                                            nkeep,
1396                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1397                                            nunused,
1398                                            free_space,
1399                                            empty_pages, empty_end_pages,
1400                                            fraged_pages->num_pages, usable_free_space,
1401                                            vac_show_rusage(&ru0))));
1402 }
1403
1404
1405 /*
1406  *      repair_frag() -- try to repair relation's fragmentation
1407  *
1408  *              This routine marks dead tuples as unused and tries re-use dead space
1409  *              by moving tuples (and inserting indexes if needed). It constructs
1410  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1411  *              for them after committing (in hack-manner - without losing locks
1412  *              and freeing memory!) current transaction. It truncates relation
1413  *              if some end-blocks are gone away.
1414  */
1415 static void
1416 repair_frag(VRelStats *vacrelstats, Relation onerel,
1417                         VacPageList vacuum_pages, VacPageList fraged_pages,
1418                         int nindexes, Relation *Irel)
1419 {
1420         TransactionId myXID;
1421         CommandId       myCID;
1422         Buffer          buf,
1423                                 cur_buffer;
1424         BlockNumber nblocks,
1425                                 blkno;
1426         BlockNumber last_move_dest_block = 0,
1427                                 last_vacuum_block;
1428         Page            page,
1429                                 ToPage = NULL;
1430         OffsetNumber offnum,
1431                                 maxoff,
1432                                 newoff,
1433                                 max_offset;
1434         ItemId          itemid,
1435                                 newitemid;
1436         HeapTupleData tuple,
1437                                 newtup;
1438         TupleDesc       tupdesc;
1439         ResultRelInfo *resultRelInfo;
1440         EState     *estate;
1441         TupleTable      tupleTable;
1442         TupleTableSlot *slot;
1443         VacPageListData Nvacpagelist;
1444         VacPage         cur_page = NULL,
1445                                 last_vacuum_page,
1446                                 vacpage,
1447                            *curpage;
1448         int                     cur_item = 0;
1449         int                     i;
1450         Size            tuple_len;
1451         int                     num_moved,
1452                                 num_fraged_pages,
1453                                 vacuumed_pages;
1454         int                     checked_moved,
1455                                 num_tuples,
1456                                 keep_tuples = 0;
1457         bool            isempty,
1458                                 dowrite,
1459                                 chain_tuple_moved;
1460         VacRUsage       ru0;
1461
1462         vac_init_rusage(&ru0);
1463
1464         myXID = GetCurrentTransactionId();
1465         myCID = GetCurrentCommandId();
1466
1467         tupdesc = RelationGetDescr(onerel);
1468
1469         /*
1470          * We need a ResultRelInfo and an EState so we can use the regular
1471          * executor's index-entry-making machinery.
1472          */
1473         estate = CreateExecutorState();
1474
1475         resultRelInfo = makeNode(ResultRelInfo);
1476         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1477         resultRelInfo->ri_RelationDesc = onerel;
1478         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1479
1480         ExecOpenIndices(resultRelInfo);
1481
1482         estate->es_result_relations = resultRelInfo;
1483         estate->es_num_result_relations = 1;
1484         estate->es_result_relation_info = resultRelInfo;
1485
1486         /* Set up a dummy tuple table too */
1487         tupleTable = ExecCreateTupleTable(1);
1488         slot = ExecAllocTableSlot(tupleTable);
1489         ExecSetSlotDescriptor(slot, tupdesc, false);
1490
1491         Nvacpagelist.num_pages = 0;
1492         num_fraged_pages = fraged_pages->num_pages;
1493         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1494         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1495         if (vacuumed_pages > 0)
1496         {
1497                 /* get last reaped page from vacuum_pages */
1498                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1499                 last_vacuum_block = last_vacuum_page->blkno;
1500         }
1501         else
1502         {
1503                 last_vacuum_page = NULL;
1504                 last_vacuum_block = InvalidBlockNumber;
1505         }
1506         cur_buffer = InvalidBuffer;
1507         num_moved = 0;
1508
1509         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1510         vacpage->offsets_used = vacpage->offsets_free = 0;
1511
1512         /*
1513          * Scan pages backwards from the last nonempty page, trying to move
1514          * tuples down to lower pages.  Quit when we reach a page that we have
1515          * moved any tuples onto, or the first page if we haven't moved
1516          * anything, or when we find a page we cannot completely empty (this
1517          * last condition is handled by "break" statements within the loop).
1518          *
1519          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1520          * in order by blkno.
1521          */
1522         nblocks = vacrelstats->rel_pages;
1523         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1524                  blkno > last_move_dest_block;
1525                  blkno--)
1526         {
1527                 CHECK_FOR_INTERRUPTS();
1528
1529                 /*
1530                  * Forget fraged_pages pages at or after this one; they're no
1531                  * longer useful as move targets, since we only want to move down.
1532                  * Note that since we stop the outer loop at last_move_dest_block,
1533                  * pages removed here cannot have had anything moved onto them
1534                  * already.
1535                  *
1536                  * Also note that we don't change the stored fraged_pages list, only
1537                  * our local variable num_fraged_pages; so the forgotten pages are
1538                  * still available to be loaded into the free space map later.
1539                  */
1540                 while (num_fraged_pages > 0 &&
1541                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1542                 {
1543                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1544                         --num_fraged_pages;
1545                 }
1546
1547                 /*
1548                  * Process this page of relation.
1549                  */
1550                 buf = ReadBuffer(onerel, blkno);
1551                 page = BufferGetPage(buf);
1552
1553                 vacpage->offsets_free = 0;
1554
1555                 isempty = PageIsEmpty(page);
1556
1557                 dowrite = false;
1558
1559                 /* Is the page in the vacuum_pages list? */
1560                 if (blkno == last_vacuum_block)
1561                 {
1562                         if (last_vacuum_page->offsets_free > 0)
1563                         {
1564                                 /* there are dead tuples on this page - clean them */
1565                                 Assert(!isempty);
1566                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1567                                 vacuum_page(onerel, buf, last_vacuum_page);
1568                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1569                                 dowrite = true;
1570                         }
1571                         else
1572                                 Assert(isempty);
1573                         --vacuumed_pages;
1574                         if (vacuumed_pages > 0)
1575                         {
1576                                 /* get prev reaped page from vacuum_pages */
1577                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1578                                 last_vacuum_block = last_vacuum_page->blkno;
1579                         }
1580                         else
1581                         {
1582                                 last_vacuum_page = NULL;
1583                                 last_vacuum_block = InvalidBlockNumber;
1584                         }
1585                         if (isempty)
1586                         {
1587                                 ReleaseBuffer(buf);
1588                                 continue;
1589                         }
1590                 }
1591                 else
1592                         Assert(!isempty);
1593
1594                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1595                                                                                  * off this page, yet */
1596                 vacpage->blkno = blkno;
1597                 maxoff = PageGetMaxOffsetNumber(page);
1598                 for (offnum = FirstOffsetNumber;
1599                          offnum <= maxoff;
1600                          offnum = OffsetNumberNext(offnum))
1601                 {
1602                         itemid = PageGetItemId(page, offnum);
1603
1604                         if (!ItemIdIsUsed(itemid))
1605                                 continue;
1606
1607                         tuple.t_datamcxt = NULL;
1608                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1609                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1610                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1611
1612                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1613                         {
1614                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1615                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1616
1617                                 /*
1618                                  * If this (chain) tuple is moved by me already then I
1619                                  * have to check is it in vacpage or not - i.e. is it
1620                                  * moved while cleaning this page or some previous one.
1621                                  */
1622                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1623                                 {
1624                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1625                                                 elog(ERROR, "invalid XVAC in tuple header");
1626                                         if (keep_tuples == 0)
1627                                                 continue;
1628                                         if (chain_tuple_moved)          /* some chains was moved
1629                                                                                                  * while */
1630                                         {                       /* cleaning this page */
1631                                                 Assert(vacpage->offsets_free > 0);
1632                                                 for (i = 0; i < vacpage->offsets_free; i++)
1633                                                 {
1634                                                         if (vacpage->offsets[i] == offnum)
1635                                                                 break;
1636                                                 }
1637                                                 if (i >= vacpage->offsets_free) /* not found */
1638                                                 {
1639                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1640                                                         keep_tuples--;
1641                                                 }
1642                                         }
1643                                         else
1644                                         {
1645                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1646                                                 keep_tuples--;
1647                                         }
1648                                         continue;
1649                                 }
1650                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1651                         }
1652
1653                         /*
1654                          * If this tuple is in the chain of tuples created in updates
1655                          * by "recent" transactions then we have to move all chain of
1656                          * tuples to another places.
1657                          *
1658                          * NOTE: this test is not 100% accurate: it is possible for a
1659                          * tuple to be an updated one with recent xmin, and yet not
1660                          * have a corresponding tuple in the vtlinks list.      Presumably
1661                          * there was once a parent tuple with xmax matching the xmin,
1662                          * but it's possible that that tuple has been removed --- for
1663                          * example, if it had xmin = xmax then
1664                          * HeapTupleSatisfiesVacuum would deem it removable as soon as
1665                          * the xmin xact completes.
1666                          *
1667                          * To be on the safe side, we abandon the repair_frag process if
1668                          * we cannot find the parent tuple in vtlinks.  This may be
1669                          * overly conservative; AFAICS it would be safe to move the
1670                          * chain.
1671                          */
1672                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
1673                          !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1674                                                                         OldestXmin)) ||
1675                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
1676                                                                                            HEAP_MARKED_FOR_UPDATE)) &&
1677                                  !(ItemPointerEquals(&(tuple.t_self),
1678                                                                          &(tuple.t_data->t_ctid)))))
1679                         {
1680                                 Buffer          Cbuf = buf;
1681                                 bool            freeCbuf = false;
1682                                 bool            chain_move_failed = false;
1683                                 Page            Cpage;
1684                                 ItemId          Citemid;
1685                                 ItemPointerData Ctid;
1686                                 HeapTupleData tp = tuple;
1687                                 Size            tlen = tuple_len;
1688                                 VTupleMove      vtmove;
1689                                 int                     num_vtmove;
1690                                 int                     free_vtmove;
1691                                 VacPage         to_vacpage = NULL;
1692                                 int                     to_item = 0;
1693                                 int                     ti;
1694
1695                                 if (cur_buffer != InvalidBuffer)
1696                                 {
1697                                         WriteBuffer(cur_buffer);
1698                                         cur_buffer = InvalidBuffer;
1699                                 }
1700
1701                                 /* Quick exit if we have no vtlinks to search in */
1702                                 if (vacrelstats->vtlinks == NULL)
1703                                 {
1704                                         elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
1705                                         break;          /* out of walk-along-page loop */
1706                                 }
1707
1708                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
1709                                 num_vtmove = 0;
1710                                 free_vtmove = 100;
1711
1712                                 /*
1713                                  * If this tuple is in the begin/middle of the chain then
1714                                  * we have to move to the end of chain.
1715                                  */
1716                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
1717                                                                                           HEAP_MARKED_FOR_UPDATE)) &&
1718                                            !(ItemPointerEquals(&(tp.t_self),
1719                                                                                    &(tp.t_data->t_ctid))))
1720                                 {
1721                                         Ctid = tp.t_data->t_ctid;
1722                                         if (freeCbuf)
1723                                                 ReleaseBuffer(Cbuf);
1724                                         freeCbuf = true;
1725                                         Cbuf = ReadBuffer(onerel,
1726                                                                           ItemPointerGetBlockNumber(&Ctid));
1727                                         Cpage = BufferGetPage(Cbuf);
1728                                         Citemid = PageGetItemId(Cpage,
1729                                                                           ItemPointerGetOffsetNumber(&Ctid));
1730                                         if (!ItemIdIsUsed(Citemid))
1731                                         {
1732                                                 /*
1733                                                  * This means that in the middle of chain there
1734                                                  * was tuple updated by older (than OldestXmin)
1735                                                  * xaction and this tuple is already deleted by
1736                                                  * me. Actually, upper part of chain should be
1737                                                  * removed and seems that this should be handled
1738                                                  * in scan_heap(), but it's not implemented at the
1739                                                  * moment and so we just stop shrinking here.
1740                                                  */
1741                                                 elog(DEBUG2, "child itemid in update-chain marked as unused --- can't continue repair_frag");
1742                                                 chain_move_failed = true;
1743                                                 break;  /* out of loop to move to chain end */
1744                                         }
1745                                         tp.t_datamcxt = NULL;
1746                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1747                                         tp.t_self = Ctid;
1748                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1749                                 }
1750                                 if (chain_move_failed)
1751                                 {
1752                                         if (freeCbuf)
1753                                                 ReleaseBuffer(Cbuf);
1754                                         pfree(vtmove);
1755                                         break;          /* out of walk-along-page loop */
1756                                 }
1757
1758                                 /*
1759                                  * Check if all items in chain can be moved
1760                                  */
1761                                 for (;;)
1762                                 {
1763                                         Buffer          Pbuf;
1764                                         Page            Ppage;
1765                                         ItemId          Pitemid;
1766                                         HeapTupleData Ptp;
1767                                         VTupleLinkData vtld,
1768                                                            *vtlp;
1769
1770                                         if (to_vacpage == NULL ||
1771                                                 !enough_space(to_vacpage, tlen))
1772                                         {
1773                                                 for (i = 0; i < num_fraged_pages; i++)
1774                                                 {
1775                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1776                                                                 break;
1777                                                 }
1778
1779                                                 if (i == num_fraged_pages)
1780                                                 {
1781                                                         /* can't move item anywhere */
1782                                                         chain_move_failed = true;
1783                                                         break;          /* out of check-all-items loop */
1784                                                 }
1785                                                 to_item = i;
1786                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1787                                         }
1788                                         to_vacpage->free -= MAXALIGN(tlen);
1789                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1790                                                 to_vacpage->free -= sizeof(ItemIdData);
1791                                         (to_vacpage->offsets_used)++;
1792                                         if (free_vtmove == 0)
1793                                         {
1794                                                 free_vtmove = 1000;
1795                                                 vtmove = (VTupleMove)
1796                                                         repalloc(vtmove,
1797                                                                          (free_vtmove + num_vtmove) *
1798                                                                          sizeof(VTupleMoveData));
1799                                         }
1800                                         vtmove[num_vtmove].tid = tp.t_self;
1801                                         vtmove[num_vtmove].vacpage = to_vacpage;
1802                                         if (to_vacpage->offsets_used == 1)
1803                                                 vtmove[num_vtmove].cleanVpd = true;
1804                                         else
1805                                                 vtmove[num_vtmove].cleanVpd = false;
1806                                         free_vtmove--;
1807                                         num_vtmove++;
1808
1809                                         /* At beginning of chain? */
1810                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1811                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
1812                                                                                           OldestXmin))
1813                                                 break;
1814
1815                                         /* No, move to tuple with prior row version */
1816                                         vtld.new_tid = tp.t_self;
1817                                         vtlp = (VTupleLink)
1818                                                 vac_bsearch((void *) &vtld,
1819                                                                         (void *) (vacrelstats->vtlinks),
1820                                                                         vacrelstats->num_vtlinks,
1821                                                                         sizeof(VTupleLinkData),
1822                                                                         vac_cmp_vtlinks);
1823                                         if (vtlp == NULL)
1824                                         {
1825                                                 /* see discussion above */
1826                                                 elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
1827                                                 chain_move_failed = true;
1828                                                 break;  /* out of check-all-items loop */
1829                                         }
1830                                         tp.t_self = vtlp->this_tid;
1831                                         Pbuf = ReadBuffer(onerel,
1832                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1833                                         Ppage = BufferGetPage(Pbuf);
1834                                         Pitemid = PageGetItemId(Ppage,
1835                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1836                                         /* this can't happen since we saw tuple earlier: */
1837                                         if (!ItemIdIsUsed(Pitemid))
1838                                                 elog(ERROR, "parent itemid marked as unused");
1839                                         Ptp.t_datamcxt = NULL;
1840                                         Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1841
1842                                         /* ctid should not have changed since we saved it */
1843                                         Assert(ItemPointerEquals(&(vtld.new_tid),
1844                                                                                          &(Ptp.t_data->t_ctid)));
1845
1846                                         /*
1847                                          * Read above about cases when !ItemIdIsUsed(Citemid)
1848                                          * (child item is removed)... Due to the fact that at
1849                                          * the moment we don't remove unuseful part of
1850                                          * update-chain, it's possible to get too old parent
1851                                          * row here. Like as in the case which caused this
1852                                          * problem, we stop shrinking here. I could try to
1853                                          * find real parent row but want not to do it because
1854                                          * of real solution will be implemented anyway, later,
1855                                          * and we are too close to 6.5 release. - vadim
1856                                          * 06/11/99
1857                                          */
1858                                         if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
1859                                                                          HeapTupleHeaderGetXmin(tp.t_data))))
1860                                         {
1861                                                 ReleaseBuffer(Pbuf);
1862                                                 elog(DEBUG2, "too old parent tuple found --- can't continue repair_frag");
1863                                                 chain_move_failed = true;
1864                                                 break;  /* out of check-all-items loop */
1865                                         }
1866                                         tp.t_datamcxt = Ptp.t_datamcxt;
1867                                         tp.t_data = Ptp.t_data;
1868                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
1869                                         if (freeCbuf)
1870                                                 ReleaseBuffer(Cbuf);
1871                                         Cbuf = Pbuf;
1872                                         freeCbuf = true;
1873                                 }                               /* end of check-all-items loop */
1874
1875                                 if (freeCbuf)
1876                                         ReleaseBuffer(Cbuf);
1877                                 freeCbuf = false;
1878
1879                                 if (chain_move_failed)
1880                                 {
1881                                         /*
1882                                          * Undo changes to offsets_used state.  We don't
1883                                          * bother cleaning up the amount-free state, since
1884                                          * we're not going to do any further tuple motion.
1885                                          */
1886                                         for (i = 0; i < num_vtmove; i++)
1887                                         {
1888                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1889                                                 (vtmove[i].vacpage->offsets_used)--;
1890                                         }
1891                                         pfree(vtmove);
1892                                         break;          /* out of walk-along-page loop */
1893                                 }
1894
1895                                 /*
1896                                  * Okay, move the whle tuple chain
1897                                  */
1898                                 ItemPointerSetInvalid(&Ctid);
1899                                 for (ti = 0; ti < num_vtmove; ti++)
1900                                 {
1901                                         VacPage         destvacpage = vtmove[ti].vacpage;
1902
1903                                         /* Get page to move from */
1904                                         tuple.t_self = vtmove[ti].tid;
1905                                         Cbuf = ReadBuffer(onerel,
1906                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1907
1908                                         /* Get page to move to */
1909                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1910
1911                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1912                                         if (cur_buffer != Cbuf)
1913                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1914
1915                                         ToPage = BufferGetPage(cur_buffer);
1916                                         Cpage = BufferGetPage(Cbuf);
1917
1918                                         Citemid = PageGetItemId(Cpage,
1919                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1920                                         tuple.t_datamcxt = NULL;
1921                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1922                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1923
1924                                         /*
1925                                          * make a copy of the source tuple, and then mark the
1926                                          * source tuple MOVED_OFF.
1927                                          */
1928                                         heap_copytuple_with_tuple(&tuple, &newtup);
1929
1930                                         /*
1931                                          * register invalidation of source tuple in catcaches.
1932                                          */
1933                                         CacheInvalidateHeapTuple(onerel, &tuple);
1934
1935                                         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
1936                                         START_CRIT_SECTION();
1937
1938                                         tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
1939                                                                                                   HEAP_XMIN_INVALID |
1940                                                                                                   HEAP_MOVED_IN);
1941                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1942                                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
1943
1944                                         /*
1945                                          * If this page was not used before - clean it.
1946                                          *
1947                                          * NOTE: a nasty bug used to lurk here.  It is possible
1948                                          * for the source and destination pages to be the same
1949                                          * (since this tuple-chain member can be on a page
1950                                          * lower than the one we're currently processing in
1951                                          * the outer loop).  If that's true, then after
1952                                          * vacuum_page() the source tuple will have been
1953                                          * moved, and tuple.t_data will be pointing at
1954                                          * garbage.  Therefore we must do everything that uses
1955                                          * tuple.t_data BEFORE this step!!
1956                                          *
1957                                          * This path is different from the other callers of
1958                                          * vacuum_page, because we have already incremented
1959                                          * the vacpage's offsets_used field to account for the
1960                                          * tuple(s) we expect to move onto the page. Therefore
1961                                          * vacuum_page's check for offsets_used == 0 is wrong.
1962                                          * But since that's a good debugging check for all
1963                                          * other callers, we work around it here rather than
1964                                          * remove it.
1965                                          */
1966                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1967                                         {
1968                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1969
1970                                                 destvacpage->offsets_used = 0;
1971                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1972                                                 destvacpage->offsets_used = sv_offsets_used;
1973                                         }
1974
1975                                         /*
1976                                          * Update the state of the copied tuple, and store it
1977                                          * on the destination page.
1978                                          */
1979                                         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
1980                                                                                                    HEAP_XMIN_INVALID |
1981                                                                                                    HEAP_MOVED_OFF);
1982                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1983                                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
1984                                         newoff = PageAddItem(ToPage,
1985                                                                                  (Item) newtup.t_data,
1986                                                                                  tuple_len,
1987                                                                                  InvalidOffsetNumber,
1988                                                                                  LP_USED);
1989                                         if (newoff == InvalidOffsetNumber)
1990                                         {
1991                                                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
1992                                                   (unsigned long) tuple_len, destvacpage->blkno);
1993                                         }
1994                                         newitemid = PageGetItemId(ToPage, newoff);
1995                                         pfree(newtup.t_data);
1996                                         newtup.t_datamcxt = NULL;
1997                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1998                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1999
2000                                         /* XLOG stuff */
2001                                         if (!onerel->rd_istemp)
2002                                         {
2003                                                 XLogRecPtr      recptr =
2004                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
2005                                                                           cur_buffer, &newtup);
2006
2007                                                 if (Cbuf != cur_buffer)
2008                                                 {
2009                                                         PageSetLSN(Cpage, recptr);
2010                                                         PageSetSUI(Cpage, ThisStartUpID);
2011                                                 }
2012                                                 PageSetLSN(ToPage, recptr);
2013                                                 PageSetSUI(ToPage, ThisStartUpID);
2014                                         }
2015                                         else
2016                                         {
2017                                                 /*
2018                                                  * No XLOG record, but still need to flag that XID
2019                                                  * exists on disk
2020                                                  */
2021                                                 MyXactMadeTempRelUpdate = true;
2022                                         }
2023
2024                                         END_CRIT_SECTION();
2025
2026                                         if (destvacpage->blkno > last_move_dest_block)
2027                                                 last_move_dest_block = destvacpage->blkno;
2028
2029                                         /*
2030                                          * Set new tuple's t_ctid pointing to itself for last
2031                                          * tuple in chain, and to next tuple in chain
2032                                          * otherwise.
2033                                          */
2034                                         if (!ItemPointerIsValid(&Ctid))
2035                                                 newtup.t_data->t_ctid = newtup.t_self;
2036                                         else
2037                                                 newtup.t_data->t_ctid = Ctid;
2038                                         Ctid = newtup.t_self;
2039
2040                                         num_moved++;
2041
2042                                         /*
2043                                          * Remember that we moved tuple from the current page
2044                                          * (corresponding index tuple will be cleaned).
2045                                          */
2046                                         if (Cbuf == buf)
2047                                                 vacpage->offsets[vacpage->offsets_free++] =
2048                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2049                                         else
2050                                                 keep_tuples++;
2051
2052                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2053                                         if (cur_buffer != Cbuf)
2054                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
2055
2056                                         /* Create index entries for the moved tuple */
2057                                         if (resultRelInfo->ri_NumIndices > 0)
2058                                         {
2059                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2060                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
2061                                                                                           estate, true);
2062                                         }
2063
2064                                         WriteBuffer(cur_buffer);
2065                                         WriteBuffer(Cbuf);
2066                                 }                               /* end of move-the-tuple-chain loop */
2067
2068                                 cur_buffer = InvalidBuffer;
2069                                 pfree(vtmove);
2070                                 chain_tuple_moved = true;
2071
2072                                 /* advance to next tuple in walk-along-page loop */
2073                                 continue;
2074                         }                                       /* end of is-tuple-in-chain test */
2075
2076                         /* try to find new page for this tuple */
2077                         if (cur_buffer == InvalidBuffer ||
2078                                 !enough_space(cur_page, tuple_len))
2079                         {
2080                                 if (cur_buffer != InvalidBuffer)
2081                                 {
2082                                         WriteBuffer(cur_buffer);
2083                                         cur_buffer = InvalidBuffer;
2084                                 }
2085                                 for (i = 0; i < num_fraged_pages; i++)
2086                                 {
2087                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2088                                                 break;
2089                                 }
2090                                 if (i == num_fraged_pages)
2091                                         break;          /* can't move item anywhere */
2092                                 cur_item = i;
2093                                 cur_page = fraged_pages->pagedesc[cur_item];
2094                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
2095                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
2096                                 ToPage = BufferGetPage(cur_buffer);
2097                                 /* if this page was not used before - clean it */
2098                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
2099                                         vacuum_page(onerel, cur_buffer, cur_page);
2100                         }
2101                         else
2102                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
2103
2104                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2105
2106                         /* copy tuple */
2107                         heap_copytuple_with_tuple(&tuple, &newtup);
2108
2109                         /*
2110                          * register invalidation of source tuple in catcaches.
2111                          *
2112                          * (Note: we do not need to register the copied tuple, because we
2113                          * are not changing the tuple contents and so there cannot be
2114                          * any need to flush negative catcache entries.)
2115                          */
2116                         CacheInvalidateHeapTuple(onerel, &tuple);
2117
2118                         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2119                         START_CRIT_SECTION();
2120
2121                         /*
2122                          * Mark new tuple as MOVED_IN by me.
2123                          */
2124                         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2125                                                                                    HEAP_XMIN_INVALID |
2126                                                                                    HEAP_MOVED_OFF);
2127                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2128                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2129
2130                         /* add tuple to the page */
2131                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
2132                                                                  InvalidOffsetNumber, LP_USED);
2133                         if (newoff == InvalidOffsetNumber)
2134                         {
2135                                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2136                                          (unsigned long) tuple_len,
2137                                          cur_page->blkno, (unsigned long) cur_page->free,
2138                                          cur_page->offsets_used, cur_page->offsets_free);
2139                         }
2140                         newitemid = PageGetItemId(ToPage, newoff);
2141                         pfree(newtup.t_data);
2142                         newtup.t_datamcxt = NULL;
2143                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
2144                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
2145                         newtup.t_self = newtup.t_data->t_ctid;
2146
2147                         /*
2148                          * Mark old tuple as MOVED_OFF by me.
2149                          */
2150                         tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2151                                                                                   HEAP_XMIN_INVALID |
2152                                                                                   HEAP_MOVED_IN);
2153                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
2154                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
2155
2156                         /* XLOG stuff */
2157                         if (!onerel->rd_istemp)
2158                         {
2159                                 XLogRecPtr      recptr =
2160                                 log_heap_move(onerel, buf, tuple.t_self,
2161                                                           cur_buffer, &newtup);
2162
2163                                 PageSetLSN(page, recptr);
2164                                 PageSetSUI(page, ThisStartUpID);
2165                                 PageSetLSN(ToPage, recptr);
2166                                 PageSetSUI(ToPage, ThisStartUpID);
2167                         }
2168                         else
2169                         {
2170                                 /*
2171                                  * No XLOG record, but still need to flag that XID exists
2172                                  * on disk
2173                                  */
2174                                 MyXactMadeTempRelUpdate = true;
2175                         }
2176
2177                         END_CRIT_SECTION();
2178
2179                         cur_page->offsets_used++;
2180                         num_moved++;
2181                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
2182                         if (cur_page->blkno > last_move_dest_block)
2183                                 last_move_dest_block = cur_page->blkno;
2184
2185                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2186
2187                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2188                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2189
2190                         /* insert index' tuples if needed */
2191                         if (resultRelInfo->ri_NumIndices > 0)
2192                         {
2193                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2194                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
2195                         }
2196                 }                                               /* walk along page */
2197
2198                 /*
2199                  * If we broke out of the walk-along-page loop early (ie, still
2200                  * have offnum <= maxoff), then we failed to move some tuple off
2201                  * this page.  No point in shrinking any more, so clean up and
2202                  * exit the per-page loop.
2203                  */
2204                 if (offnum < maxoff && keep_tuples > 0)
2205                 {
2206                         OffsetNumber off;
2207
2208                         /*
2209                          * Fix vacpage state for any unvisited tuples remaining on
2210                          * page
2211                          */
2212                         for (off = OffsetNumberNext(offnum);
2213                                  off <= maxoff;
2214                                  off = OffsetNumberNext(off))
2215                         {
2216                                 itemid = PageGetItemId(page, off);
2217                                 if (!ItemIdIsUsed(itemid))
2218                                         continue;
2219                                 tuple.t_datamcxt = NULL;
2220                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2221                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2222                                         continue;
2223                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2224                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2225                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2226                                 {
2227                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2228                                                 elog(ERROR, "invalid XVAC in tuple header");
2229                                         /* some chains was moved while */
2230                                         if (chain_tuple_moved)
2231                                         {                       /* cleaning this page */
2232                                                 Assert(vacpage->offsets_free > 0);
2233                                                 for (i = 0; i < vacpage->offsets_free; i++)
2234                                                 {
2235                                                         if (vacpage->offsets[i] == off)
2236                                                                 break;
2237                                                 }
2238                                                 if (i >= vacpage->offsets_free) /* not found */
2239                                                 {
2240                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2241                                                         Assert(keep_tuples > 0);
2242                                                         keep_tuples--;
2243                                                 }
2244                                         }
2245                                         else
2246                                         {
2247                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2248                                                 Assert(keep_tuples > 0);
2249                                                 keep_tuples--;
2250                                         }
2251                                 }
2252                                 else
2253                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2254                         }
2255                 }
2256
2257                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2258                 {
2259                         if (chain_tuple_moved)          /* else - they are ordered */
2260                         {
2261                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2262                                           sizeof(OffsetNumber), vac_cmp_offno);
2263                         }
2264                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2265                         WriteBuffer(buf);
2266                 }
2267                 else if (dowrite)
2268                         WriteBuffer(buf);
2269                 else
2270                         ReleaseBuffer(buf);
2271
2272                 if (offnum <= maxoff)
2273                         break;                          /* had to quit early, see above note */
2274
2275         }                                                       /* walk along relation */
2276
2277         blkno++;                                        /* new number of blocks */
2278
2279         if (cur_buffer != InvalidBuffer)
2280         {
2281                 Assert(num_moved > 0);
2282                 WriteBuffer(cur_buffer);
2283         }
2284
2285         if (num_moved > 0)
2286         {
2287                 /*
2288                  * We have to commit our tuple movings before we truncate the
2289                  * relation.  Ideally we should do Commit/StartTransactionCommand
2290                  * here, relying on the session-level table lock to protect our
2291                  * exclusive access to the relation.  However, that would require
2292                  * a lot of extra code to close and re-open the relation, indexes,
2293                  * etc.  For now, a quick hack: record status of current
2294                  * transaction as committed, and continue.
2295                  */
2296                 RecordTransactionCommit();
2297         }
2298
2299         /*
2300          * We are not going to move any more tuples across pages, but we still
2301          * need to apply vacuum_page to compact free space in the remaining
2302          * pages in vacuum_pages list.  Note that some of these pages may also
2303          * be in the fraged_pages list, and may have had tuples moved onto
2304          * them; if so, we already did vacuum_page and needn't do it again.
2305          */
2306         for (i = 0, curpage = vacuum_pages->pagedesc;
2307                  i < vacuumed_pages;
2308                  i++, curpage++)
2309         {
2310                 CHECK_FOR_INTERRUPTS();
2311                 Assert((*curpage)->blkno < blkno);
2312                 if ((*curpage)->offsets_used == 0)
2313                 {
2314                         /* this page was not used as a move target, so must clean it */
2315                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2316                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2317                         page = BufferGetPage(buf);
2318                         if (!PageIsEmpty(page))
2319                                 vacuum_page(onerel, buf, *curpage);
2320                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2321                         WriteBuffer(buf);
2322                 }
2323         }
2324
2325         /*
2326          * Now scan all the pages that we moved tuples onto and update tuple
2327          * status bits.  This is not really necessary, but will save time for
2328          * future transactions examining these tuples.
2329          *
2330          * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
2331          * pages that were move source pages but not move dest pages.  One
2332          * also wonders whether it wouldn't be better to skip this step and
2333          * let the tuple status updates happen someplace that's not holding an
2334          * exclusive lock on the relation.
2335          */
2336         checked_moved = 0;
2337         for (i = 0, curpage = fraged_pages->pagedesc;
2338                  i < num_fraged_pages;
2339                  i++, curpage++)
2340         {
2341                 CHECK_FOR_INTERRUPTS();
2342                 Assert((*curpage)->blkno < blkno);
2343                 if ((*curpage)->blkno > last_move_dest_block)
2344                         break;                          /* no need to scan any further */
2345                 if ((*curpage)->offsets_used == 0)
2346                         continue;                       /* this page was never used as a move dest */
2347                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2348                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2349                 page = BufferGetPage(buf);
2350                 num_tuples = 0;
2351                 max_offset = PageGetMaxOffsetNumber(page);
2352                 for (newoff = FirstOffsetNumber;
2353                          newoff <= max_offset;
2354                          newoff = OffsetNumberNext(newoff))
2355                 {
2356                         itemid = PageGetItemId(page, newoff);
2357                         if (!ItemIdIsUsed(itemid))
2358                                 continue;
2359                         tuple.t_datamcxt = NULL;
2360                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2361                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2362                         {
2363                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED))
2364                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2365                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2366                                         elog(ERROR, "invalid XVAC in tuple header");
2367                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2368                                 {
2369                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2370                                         tuple.t_data->t_infomask &= ~HEAP_MOVED;
2371                                         num_tuples++;
2372                                 }
2373                                 else
2374                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2375                         }
2376                 }
2377                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2378                 WriteBuffer(buf);
2379                 Assert((*curpage)->offsets_used == num_tuples);
2380                 checked_moved += num_tuples;
2381         }
2382         Assert(num_moved == checked_moved);
2383
2384         /*
2385          * It'd be cleaner to make this report at the bottom of this routine,
2386          * but then the rusage would double-count the second pass of index
2387          * vacuuming.  So do it here and ignore the relatively small amount of
2388          * processing that occurs below.
2389          */
2390         ereport(elevel,
2391                         (errmsg("\"%s\": moved %u tuples, truncated %u to %u pages",
2392                                         RelationGetRelationName(onerel),
2393                                         num_moved, nblocks, blkno),
2394                          errdetail("%s",
2395                                            vac_show_rusage(&ru0))));
2396
2397         /*
2398          * Reflect the motion of system tuples to catalog cache here.
2399          */
2400         CommandCounterIncrement();
2401
2402         if (Nvacpagelist.num_pages > 0)
2403         {
2404                 /* vacuum indexes again if needed */
2405                 if (Irel != (Relation *) NULL)
2406                 {
2407                         VacPage    *vpleft,
2408                                            *vpright,
2409                                                 vpsave;
2410
2411                         /* re-sort Nvacpagelist.pagedesc */
2412                         for (vpleft = Nvacpagelist.pagedesc,
2413                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2414                                  vpleft < vpright; vpleft++, vpright--)
2415                         {
2416                                 vpsave = *vpleft;
2417                                 *vpleft = *vpright;
2418                                 *vpright = vpsave;
2419                         }
2420                         Assert(keep_tuples >= 0);
2421                         for (i = 0; i < nindexes; i++)
2422                                 vacuum_index(&Nvacpagelist, Irel[i],
2423                                                          vacrelstats->rel_tuples, keep_tuples);
2424                 }
2425
2426                 /* clean moved tuples from last page in Nvacpagelist list */
2427                 if (vacpage->blkno == (blkno - 1) &&
2428                         vacpage->offsets_free > 0)
2429                 {
2430                         OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
2431                         int                     uncnt;
2432
2433                         buf = ReadBuffer(onerel, vacpage->blkno);
2434                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2435                         page = BufferGetPage(buf);
2436                         num_tuples = 0;
2437                         maxoff = PageGetMaxOffsetNumber(page);
2438                         for (offnum = FirstOffsetNumber;
2439                                  offnum <= maxoff;
2440                                  offnum = OffsetNumberNext(offnum))
2441                         {
2442                                 itemid = PageGetItemId(page, offnum);
2443                                 if (!ItemIdIsUsed(itemid))
2444                                         continue;
2445                                 tuple.t_datamcxt = NULL;
2446                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2447
2448                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2449                                 {
2450                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2451                                         {
2452                                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2453                                                         elog(ERROR, "invalid XVAC in tuple header");
2454                                                 itemid->lp_flags &= ~LP_USED;
2455                                                 num_tuples++;
2456                                         }
2457                                         else
2458                                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
2459                                 }
2460
2461                         }
2462                         Assert(vacpage->offsets_free == num_tuples);
2463
2464                         START_CRIT_SECTION();
2465
2466                         uncnt = PageRepairFragmentation(page, unused);
2467
2468                         /* XLOG stuff */
2469                         if (!onerel->rd_istemp)
2470                         {
2471                                 XLogRecPtr      recptr;
2472
2473                                 recptr = log_heap_clean(onerel, buf, unused, uncnt);
2474                                 PageSetLSN(page, recptr);
2475                                 PageSetSUI(page, ThisStartUpID);
2476                         }
2477                         else
2478                         {
2479                                 /*
2480                                  * No XLOG record, but still need to flag that XID exists
2481                                  * on disk
2482                                  */
2483                                 MyXactMadeTempRelUpdate = true;
2484                         }
2485
2486                         END_CRIT_SECTION();
2487
2488                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2489                         WriteBuffer(buf);
2490                 }
2491
2492                 /* now - free new list of reaped pages */
2493                 curpage = Nvacpagelist.pagedesc;
2494                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2495                         pfree(*curpage);
2496                 pfree(Nvacpagelist.pagedesc);
2497         }
2498
2499         /*
2500          * Flush dirty pages out to disk.  We do this unconditionally, even if
2501          * we don't need to truncate, because we want to ensure that all
2502          * tuples have correct on-row commit status on disk (see bufmgr.c's
2503          * comments for FlushRelationBuffers()).
2504          */
2505         i = FlushRelationBuffers(onerel, blkno);
2506         if (i < 0)
2507                 elog(ERROR, "FlushRelationBuffers returned %d", i);
2508
2509         /* truncate relation, if needed */
2510         if (blkno < nblocks)
2511         {
2512                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2513                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2514                 onerel->rd_targblock = InvalidBlockNumber;
2515                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2516         }
2517
2518         /* clean up */
2519         pfree(vacpage);
2520         if (vacrelstats->vtlinks != NULL)
2521                 pfree(vacrelstats->vtlinks);
2522
2523         ExecDropTupleTable(tupleTable, true);
2524
2525         ExecCloseIndices(resultRelInfo);
2526
2527         FreeExecutorState(estate);
2528 }
2529
2530 /*
2531  *      vacuum_heap() -- free dead tuples
2532  *
2533  *              This routine marks dead tuples as unused and truncates relation
2534  *              if there are "empty" end-blocks.
2535  */
2536 static void
2537 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2538 {
2539         Buffer          buf;
2540         VacPage    *vacpage;
2541         BlockNumber relblocks;
2542         int                     nblocks;
2543         int                     i;
2544
2545         nblocks = vacuum_pages->num_pages;
2546         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2547
2548         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2549         {
2550                 CHECK_FOR_INTERRUPTS();
2551                 if ((*vacpage)->offsets_free > 0)
2552                 {
2553                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2554                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2555                         vacuum_page(onerel, buf, *vacpage);
2556                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2557                         WriteBuffer(buf);
2558                 }
2559         }
2560
2561         /*
2562          * Flush dirty pages out to disk.  We do this unconditionally, even if
2563          * we don't need to truncate, because we want to ensure that all
2564          * tuples have correct on-row commit status on disk (see bufmgr.c's
2565          * comments for FlushRelationBuffers()).
2566          */
2567         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2568         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2569
2570         i = FlushRelationBuffers(onerel, relblocks);
2571         if (i < 0)
2572                 elog(ERROR, "FlushRelationBuffers returned %d", i);
2573
2574         /* truncate relation if there are some empty end-pages */
2575         if (vacuum_pages->empty_end_pages > 0)
2576         {
2577                 ereport(elevel,
2578                                 (errmsg("\"%s\": truncated %u to %u pages",
2579                                                 RelationGetRelationName(onerel),
2580                                                 vacrelstats->rel_pages, relblocks)));
2581                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2582                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2583                 onerel->rd_targblock = InvalidBlockNumber;
2584                 vacrelstats->rel_pages = relblocks;             /* set new number of
2585                                                                                                  * blocks */
2586         }
2587 }
2588
2589 /*
2590  *      vacuum_page() -- free dead tuples on a page
2591  *                                       and repair its fragmentation.
2592  */
2593 static void
2594 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2595 {
2596         OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
2597         int                     uncnt;
2598         Page            page = BufferGetPage(buffer);
2599         ItemId          itemid;
2600         int                     i;
2601
2602         /* There shouldn't be any tuples moved onto the page yet! */
2603         Assert(vacpage->offsets_used == 0);
2604
2605         START_CRIT_SECTION();
2606
2607         for (i = 0; i < vacpage->offsets_free; i++)
2608         {
2609                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2610                 itemid->lp_flags &= ~LP_USED;
2611         }
2612
2613         uncnt = PageRepairFragmentation(page, unused);
2614
2615         /* XLOG stuff */
2616         if (!onerel->rd_istemp)
2617         {
2618                 XLogRecPtr      recptr;
2619
2620                 recptr = log_heap_clean(onerel, buffer, unused, uncnt);
2621                 PageSetLSN(page, recptr);
2622                 PageSetSUI(page, ThisStartUpID);
2623         }
2624         else
2625         {
2626                 /* No XLOG record, but still need to flag that XID exists on disk */
2627                 MyXactMadeTempRelUpdate = true;
2628         }
2629
2630         END_CRIT_SECTION();
2631 }
2632
2633 /*
2634  *      scan_index() -- scan one index relation to update statistic.
2635  *
2636  * We use this when we have no deletions to do.
2637  */
2638 static void
2639 scan_index(Relation indrel, double num_tuples)
2640 {
2641         IndexBulkDeleteResult *stats;
2642         IndexVacuumCleanupInfo vcinfo;
2643         VacRUsage       ru0;
2644
2645         vac_init_rusage(&ru0);
2646
2647         /*
2648          * Even though we're not planning to delete anything, we use the
2649          * ambulkdelete call, because (a) the scan happens within the index AM
2650          * for more speed, and (b) it may want to pass private statistics to
2651          * the amvacuumcleanup call.
2652          */
2653         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2654
2655         /* Do post-VACUUM cleanup, even though we deleted nothing */
2656         vcinfo.vacuum_full = true;
2657         vcinfo.message_level = elevel;
2658
2659         stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
2660
2661         if (!stats)
2662                 return;
2663
2664         /* now update statistics in pg_class */
2665         vac_update_relstats(RelationGetRelid(indrel),
2666                                                 stats->num_pages, stats->num_index_tuples,
2667                                                 false);
2668
2669         ereport(elevel,
2670                         (errmsg("index \"%s\" now contains %.0f tuples in %u pages",
2671                                         RelationGetRelationName(indrel),
2672                                         stats->num_index_tuples,
2673                                         stats->num_pages),
2674                          errdetail("%u index pages have been deleted, %u are currently reusable.\n"
2675                                            "%s",
2676                                            stats->pages_deleted, stats->pages_free,
2677                                            vac_show_rusage(&ru0))));
2678
2679         /*
2680          * Check for tuple count mismatch.      If the index is partial, then it's
2681          * OK for it to have fewer tuples than the heap; else we got trouble.
2682          */
2683         if (stats->num_index_tuples != num_tuples)
2684         {
2685                 if (stats->num_index_tuples > num_tuples ||
2686                         !vac_is_partial_index(indrel))
2687                         ereport(WARNING,
2688                                         (errmsg("index \"%s\" contains %.0f tuples, but table contains %.0f tuples",
2689                                                         RelationGetRelationName(indrel),
2690                                                         stats->num_index_tuples, num_tuples),
2691                                          errhint("Rebuild the index with REINDEX.")));
2692         }
2693
2694         pfree(stats);
2695 }
2696
2697 /*
2698  *      vacuum_index() -- vacuum one index relation.
2699  *
2700  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2701  *              It's locked. Indrel is an index relation on the vacuumed heap.
2702  *
2703  *              We don't bother to set locks on the index relation here, since
2704  *              the parent table is exclusive-locked already.
2705  *
2706  *              Finally, we arrange to update the index relation's statistics in
2707  *              pg_class.
2708  */
2709 static void
2710 vacuum_index(VacPageList vacpagelist, Relation indrel,
2711                          double num_tuples, int keep_tuples)
2712 {
2713         IndexBulkDeleteResult *stats;
2714         IndexVacuumCleanupInfo vcinfo;
2715         VacRUsage       ru0;
2716
2717         vac_init_rusage(&ru0);
2718
2719         /* Do bulk deletion */
2720         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2721
2722         /* Do post-VACUUM cleanup */
2723         vcinfo.vacuum_full = true;
2724         vcinfo.message_level = elevel;
2725
2726         stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
2727
2728         if (!stats)
2729                 return;
2730
2731         /* now update statistics in pg_class */
2732         vac_update_relstats(RelationGetRelid(indrel),
2733                                                 stats->num_pages, stats->num_index_tuples,
2734                                                 false);
2735
2736         ereport(elevel,
2737                         (errmsg("index \"%s\" now contains %.0f tuples in %u pages",
2738                                         RelationGetRelationName(indrel),
2739                                         stats->num_index_tuples,
2740                                         stats->num_pages),
2741                          errdetail("%.0f index tuples were removed.\n"
2742                  "%u index pages have been deleted, %u are currently reusable.\n"
2743                                            "%s",
2744                                            stats->tuples_removed,
2745                                            stats->pages_deleted, stats->pages_free,
2746                                            vac_show_rusage(&ru0))));
2747
2748         /*
2749          * Check for tuple count mismatch.      If the index is partial, then it's
2750          * OK for it to have fewer tuples than the heap; else we got trouble.
2751          */
2752         if (stats->num_index_tuples != num_tuples + keep_tuples)
2753         {
2754                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2755                         !vac_is_partial_index(indrel))
2756                         ereport(WARNING,
2757                                         (errmsg("index \"%s\" contains %.0f tuples, but table contains %.0f tuples",
2758                                                         RelationGetRelationName(indrel),
2759                                           stats->num_index_tuples, num_tuples + keep_tuples),
2760                                          errhint("Rebuild the index with REINDEX.")));
2761         }
2762
2763         pfree(stats);
2764 }
2765
2766 /*
2767  *      tid_reaped() -- is a particular tid reaped?
2768  *
2769  *              This has the right signature to be an IndexBulkDeleteCallback.
2770  *
2771  *              vacpagelist->VacPage_array is sorted in right order.
2772  */
2773 static bool
2774 tid_reaped(ItemPointer itemptr, void *state)
2775 {
2776         VacPageList vacpagelist = (VacPageList) state;
2777         OffsetNumber ioffno;
2778         OffsetNumber *voff;
2779         VacPage         vp,
2780                            *vpp;
2781         VacPageData vacpage;
2782
2783         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2784         ioffno = ItemPointerGetOffsetNumber(itemptr);
2785
2786         vp = &vacpage;
2787         vpp = (VacPage *) vac_bsearch((void *) &vp,
2788                                                                   (void *) (vacpagelist->pagedesc),
2789                                                                   vacpagelist->num_pages,
2790                                                                   sizeof(VacPage),
2791                                                                   vac_cmp_blk);
2792
2793         if (vpp == NULL)
2794                 return false;
2795
2796         /* ok - we are on a partially or fully reaped page */
2797         vp = *vpp;
2798
2799         if (vp->offsets_free == 0)
2800         {
2801                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2802                 return true;
2803         }
2804
2805         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2806                                                                                 (void *) (vp->offsets),
2807                                                                                 vp->offsets_free,
2808                                                                                 sizeof(OffsetNumber),
2809                                                                                 vac_cmp_offno);
2810
2811         if (voff == NULL)
2812                 return false;
2813
2814         /* tid is reaped */
2815         return true;
2816 }
2817
2818 /*
2819  * Dummy version for scan_index.
2820  */
2821 static bool
2822 dummy_tid_reaped(ItemPointer itemptr, void *state)
2823 {
2824         return false;
2825 }
2826
2827 /*
2828  * Update the shared Free Space Map with the info we now have about
2829  * free space in the relation, discarding any old info the map may have.
2830  */
2831 static void
2832 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2833                            BlockNumber rel_pages)
2834 {
2835         int                     nPages = fraged_pages->num_pages;
2836         VacPage    *pagedesc = fraged_pages->pagedesc;
2837         Size            threshold;
2838         PageFreeSpaceInfo *pageSpaces;
2839         int                     outPages;
2840         int                     i;
2841
2842         /*
2843          * We only report pages with free space at least equal to the average
2844          * request size --- this avoids cluttering FSM with uselessly-small
2845          * bits of space.  Although FSM would discard pages with little free
2846          * space anyway, it's important to do this prefiltering because (a) it
2847          * reduces the time spent holding the FSM lock in
2848          * RecordRelationFreeSpace, and (b) FSM uses the number of pages
2849          * reported as a statistic for guiding space management.  If we didn't
2850          * threshold our reports the same way vacuumlazy.c does, we'd be
2851          * skewing that statistic.
2852          */
2853         threshold = GetAvgFSMRequestSize(&onerel->rd_node);
2854
2855         /* +1 to avoid palloc(0) */
2856         pageSpaces = (PageFreeSpaceInfo *)
2857                 palloc((nPages + 1) * sizeof(PageFreeSpaceInfo));
2858         outPages = 0;
2859
2860         for (i = 0; i < nPages; i++)
2861         {
2862                 /*
2863                  * fraged_pages may contain entries for pages that we later
2864                  * decided to truncate from the relation; don't enter them into
2865                  * the free space map!
2866                  */
2867                 if (pagedesc[i]->blkno >= rel_pages)
2868                         break;
2869
2870                 if (pagedesc[i]->free >= threshold)
2871                 {
2872                         pageSpaces[outPages].blkno = pagedesc[i]->blkno;
2873                         pageSpaces[outPages].avail = pagedesc[i]->free;
2874                         outPages++;
2875                 }
2876         }
2877
2878         RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
2879
2880         pfree(pageSpaces);
2881 }
2882
2883 /* Copy a VacPage structure */
2884 static VacPage
2885 copy_vac_page(VacPage vacpage)
2886 {
2887         VacPage         newvacpage;
2888
2889         /* allocate a VacPageData entry */
2890         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2891                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2892
2893         /* fill it in */
2894         if (vacpage->offsets_free > 0)
2895                 memcpy(newvacpage->offsets, vacpage->offsets,
2896                            vacpage->offsets_free * sizeof(OffsetNumber));
2897         newvacpage->blkno = vacpage->blkno;
2898         newvacpage->free = vacpage->free;
2899         newvacpage->offsets_used = vacpage->offsets_used;
2900         newvacpage->offsets_free = vacpage->offsets_free;
2901
2902         return newvacpage;
2903 }
2904
2905 /*
2906  * Add a VacPage pointer to a VacPageList.
2907  *
2908  *              As a side effect of the way that scan_heap works,
2909  *              higher pages come after lower pages in the array
2910  *              (and highest tid on a page is last).
2911  */
2912 static void
2913 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2914 {
2915 #define PG_NPAGEDESC 1024
2916
2917         /* allocate a VacPage entry if needed */
2918         if (vacpagelist->num_pages == 0)
2919         {
2920                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2921                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2922         }
2923         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2924         {
2925                 vacpagelist->num_allocated_pages *= 2;
2926                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2927         }
2928         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2929         (vacpagelist->num_pages)++;
2930 }
2931
2932 /*
2933  * vac_bsearch: just like standard C library routine bsearch(),
2934  * except that we first test to see whether the target key is outside
2935  * the range of the table entries.      This case is handled relatively slowly
2936  * by the normal binary search algorithm (ie, no faster than any other key)
2937  * but it occurs often enough in VACUUM to be worth optimizing.
2938  */
2939 static void *
2940 vac_bsearch(const void *key, const void *base,
2941                         size_t nelem, size_t size,
2942                         int (*compar) (const void *, const void *))
2943 {
2944         int                     res;
2945         const void *last;
2946
2947         if (nelem == 0)
2948                 return NULL;
2949         res = compar(key, base);
2950         if (res < 0)
2951                 return NULL;
2952         if (res == 0)
2953                 return (void *) base;
2954         if (nelem > 1)
2955         {
2956                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2957                 res = compar(key, last);
2958                 if (res > 0)
2959                         return NULL;
2960                 if (res == 0)
2961                         return (void *) last;
2962         }
2963         if (nelem <= 2)
2964                 return NULL;                    /* already checked 'em all */
2965         return bsearch(key, base, nelem, size, compar);
2966 }
2967
2968 /*
2969  * Comparator routines for use with qsort() and bsearch().
2970  */
2971 static int
2972 vac_cmp_blk(const void *left, const void *right)
2973 {
2974         BlockNumber lblk,
2975                                 rblk;
2976
2977         lblk = (*((VacPage *) left))->blkno;
2978         rblk = (*((VacPage *) right))->blkno;
2979
2980         if (lblk < rblk)
2981                 return -1;
2982         if (lblk == rblk)
2983                 return 0;
2984         return 1;
2985 }
2986
2987 static int
2988 vac_cmp_offno(const void *left, const void *right)
2989 {
2990         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2991                 return -1;
2992         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2993                 return 0;
2994         return 1;
2995 }
2996
2997 static int
2998 vac_cmp_vtlinks(const void *left, const void *right)
2999 {
3000         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3001                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3002                 return -1;
3003         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3004                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3005                 return 1;
3006         /* bi_hi-es are equal */
3007         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3008                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3009                 return -1;
3010         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3011                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3012                 return 1;
3013         /* bi_lo-es are equal */
3014         if (((VTupleLink) left)->new_tid.ip_posid <
3015                 ((VTupleLink) right)->new_tid.ip_posid)
3016                 return -1;
3017         if (((VTupleLink) left)->new_tid.ip_posid >
3018                 ((VTupleLink) right)->new_tid.ip_posid)
3019                 return 1;
3020         return 0;
3021 }
3022
3023
3024 void
3025 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
3026 {
3027         List       *indexoidlist,
3028                            *indexoidscan;
3029         int                     i;
3030
3031         indexoidlist = RelationGetIndexList(relation);
3032
3033         *nindexes = length(indexoidlist);
3034
3035         if (*nindexes > 0)
3036                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3037         else
3038                 *Irel = NULL;
3039
3040         i = 0;
3041         foreach(indexoidscan, indexoidlist)
3042         {
3043                 Oid                     indexoid = lfirsto(indexoidscan);
3044
3045                 (*Irel)[i] = index_open(indexoid);
3046                 i++;
3047         }
3048
3049         freeList(indexoidlist);
3050 }
3051
3052
3053 void
3054 vac_close_indexes(int nindexes, Relation *Irel)
3055 {
3056         if (Irel == (Relation *) NULL)
3057                 return;
3058
3059         while (nindexes--)
3060                 index_close(Irel[nindexes]);
3061         pfree(Irel);
3062 }
3063
3064
3065 /*
3066  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3067  */
3068 bool
3069 vac_is_partial_index(Relation indrel)
3070 {
3071         /*
3072          * If the index's AM doesn't support nulls, it's partial for our
3073          * purposes
3074          */
3075         if (!indrel->rd_am->amindexnulls)
3076                 return true;
3077
3078         /* Otherwise, look to see if there's a partial-index predicate */
3079         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3080                 return true;
3081
3082         return false;
3083 }
3084
3085
3086 static bool
3087 enough_space(VacPage vacpage, Size len)
3088 {
3089         len = MAXALIGN(len);
3090
3091         if (len > vacpage->free)
3092                 return false;
3093
3094         /* if there are free itemid(s) and len <= free_space... */
3095         if (vacpage->offsets_used < vacpage->offsets_free)
3096                 return true;
3097
3098         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3099         if (len + sizeof(ItemIdData) <= vacpage->free)
3100                 return true;
3101
3102         return false;
3103 }
3104
3105
3106 /*
3107  * Initialize usage snapshot.
3108  */
3109 void
3110 vac_init_rusage(VacRUsage *ru0)
3111 {
3112         struct timezone tz;
3113
3114         getrusage(RUSAGE_SELF, &ru0->ru);
3115         gettimeofday(&ru0->tv, &tz);
3116 }
3117
3118 /*
3119  * Compute elapsed time since ru0 usage snapshot, and format into
3120  * a displayable string.  Result is in a static string, which is
3121  * tacky, but no one ever claimed that the Postgres backend is
3122  * threadable...
3123  */
3124 const char *
3125 vac_show_rusage(VacRUsage *ru0)
3126 {
3127         static char result[100];
3128         VacRUsage       ru1;
3129
3130         vac_init_rusage(&ru1);
3131
3132         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
3133         {
3134                 ru1.tv.tv_sec--;
3135                 ru1.tv.tv_usec += 1000000;
3136         }
3137         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
3138         {
3139                 ru1.ru.ru_stime.tv_sec--;
3140                 ru1.ru.ru_stime.tv_usec += 1000000;
3141         }
3142         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
3143         {
3144                 ru1.ru.ru_utime.tv_sec--;
3145                 ru1.ru.ru_utime.tv_usec += 1000000;
3146         }
3147
3148         snprintf(result, sizeof(result),
3149                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
3150                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
3151           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
3152                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
3153           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
3154                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
3155                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
3156
3157         return result;
3158 }