]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Portal and memory management infrastructure for extended query protocol.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.252 2003/05/02 20:54:33 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_index.h"
33 #include "commands/vacuum.h"
34 #include "executor/executor.h"
35 #include "miscadmin.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "storage/smgr.h"
39 #include "tcop/pquery.h"
40 #include "utils/acl.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/inval.h"
44 #include "utils/lsyscache.h"
45 #include "utils/relcache.h"
46 #include "utils/syscache.h"
47 #include "pgstat.h"
48
49
50 typedef struct VacPageData
51 {
52         BlockNumber blkno;                      /* BlockNumber of this Page */
53         Size            free;                   /* FreeSpace on this Page */
54         uint16          offsets_used;   /* Number of OffNums used by vacuum */
55         uint16          offsets_free;   /* Number of OffNums free or to be free */
56         OffsetNumber offsets[1];        /* Array of free OffNums */
57 } VacPageData;
58
59 typedef VacPageData *VacPage;
60
61 typedef struct VacPageListData
62 {
63         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
64         int                     num_pages;              /* Number of pages in pagedesc */
65         int                     num_allocated_pages;    /* Number of allocated pages in
66                                                                                  * pagedesc */
67         VacPage    *pagedesc;           /* Descriptions of pages */
68 } VacPageListData;
69
70 typedef VacPageListData *VacPageList;
71
72 typedef struct VTupleLinkData
73 {
74         ItemPointerData new_tid;
75         ItemPointerData this_tid;
76 } VTupleLinkData;
77
78 typedef VTupleLinkData *VTupleLink;
79
80 typedef struct VTupleMoveData
81 {
82         ItemPointerData tid;            /* tuple ID */
83         VacPage         vacpage;                /* where to move */
84         bool            cleanVpd;               /* clean vacpage before using */
85 } VTupleMoveData;
86
87 typedef VTupleMoveData *VTupleMove;
88
89 typedef struct VRelStats
90 {
91         BlockNumber rel_pages;
92         double          rel_tuples;
93         Size            min_tlen;
94         Size            max_tlen;
95         bool            hasindex;
96         int                     num_vtlinks;
97         VTupleLink      vtlinks;
98 } VRelStats;
99
100
101 static MemoryContext vac_context = NULL;
102
103 static int      elevel = -1;
104
105 static TransactionId OldestXmin;
106 static TransactionId FreezeLimit;
107
108
109 /* non-export function prototypes */
110 static List *getrels(const RangeVar *vacrel, const char *stmttype);
111 static void vac_update_dbstats(Oid dbid,
112                                    TransactionId vacuumXID,
113                                    TransactionId frozenXID);
114 static void vac_truncate_clog(TransactionId vacuumXID,
115                                   TransactionId frozenXID);
116 static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
117 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
118 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
119                   VacPageList vacuum_pages, VacPageList fraged_pages);
120 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
121                         VacPageList vacuum_pages, VacPageList fraged_pages,
122                         int nindexes, Relation *Irel);
123 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
124                         VacPageList vacpagelist);
125 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
126 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
127                          double num_tuples, int keep_tuples);
128 static void scan_index(Relation indrel, double num_tuples);
129 static bool tid_reaped(ItemPointer itemptr, void *state);
130 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
131 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
132                            BlockNumber rel_pages);
133 static VacPage copy_vac_page(VacPage vacpage);
134 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
135 static void *vac_bsearch(const void *key, const void *base,
136                         size_t nelem, size_t size,
137                         int (*compar) (const void *, const void *));
138 static int      vac_cmp_blk(const void *left, const void *right);
139 static int      vac_cmp_offno(const void *left, const void *right);
140 static int      vac_cmp_vtlinks(const void *left, const void *right);
141 static bool enough_space(VacPage vacpage, Size len);
142
143
144 /****************************************************************************
145  *                                                                                                                                                      *
146  *                      Code common to all flavors of VACUUM and ANALYZE                                *
147  *                                                                                                                                                      *
148  ****************************************************************************
149  */
150
151
152 /*
153  * Primary entry point for VACUUM and ANALYZE commands.
154  */
155 void
156 vacuum(VacuumStmt *vacstmt)
157 {
158         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
159         MemoryContext anl_context = NULL;
160         TransactionId initialOldestXmin = InvalidTransactionId;
161         TransactionId initialFreezeLimit = InvalidTransactionId;
162         bool            all_rels;
163         List       *vrl,
164                            *cur;
165
166         if (vacstmt->verbose)
167                 elevel = INFO;
168         else
169                 elevel = DEBUG1;
170
171         /*
172          * We cannot run VACUUM inside a user transaction block; if we were
173          * inside a transaction, then our commit- and
174          * start-transaction-command calls would not have the intended effect!
175          * Furthermore, the forced commit that occurs before truncating the
176          * relation's file would have the effect of committing the rest of the
177          * user's transaction too, which would certainly not be the desired
178          * behavior.
179          */
180         if (vacstmt->vacuum)
181                 PreventTransactionChain((void *) vacstmt, stmttype);
182
183         /*
184          * Send info about dead objects to the statistics collector
185          */
186         if (vacstmt->vacuum)
187                 pgstat_vacuum_tabstat();
188
189         /*
190          * Create special memory context for cross-transaction storage.
191          *
192          * Since it is a child of PortalContext, it will go away eventually even
193          * if we suffer an error; there's no need for special abort cleanup
194          * logic.
195          */
196         vac_context = AllocSetContextCreate(PortalContext,
197                                                                                 "Vacuum",
198                                                                                 ALLOCSET_DEFAULT_MINSIZE,
199                                                                                 ALLOCSET_DEFAULT_INITSIZE,
200                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
201
202         /*
203          * If we are running only ANALYZE, we don't need per-table
204          * transactions, but we still need a memory context with table
205          * lifetime.
206          */
207         if (vacstmt->analyze && !vacstmt->vacuum)
208                 anl_context = AllocSetContextCreate(PortalContext,
209                                                                                         "Analyze",
210                                                                                         ALLOCSET_DEFAULT_MINSIZE,
211                                                                                         ALLOCSET_DEFAULT_INITSIZE,
212                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
213
214         /* Assume we are processing everything unless one table is mentioned */
215         all_rels = (vacstmt->relation == NULL);
216
217         /* Build list of relations to process (note this lives in vac_context) */
218         vrl = getrels(vacstmt->relation, stmttype);
219
220         /*
221          * Formerly, there was code here to prevent more than one VACUUM from
222          * executing concurrently in the same database.  However, there's no
223          * good reason to prevent that, and manually removing lockfiles after
224          * a vacuum crash was a pain for dbadmins.      So, forget about
225          * lockfiles, and just rely on the locks we grab on each target table
226          * to ensure that there aren't two VACUUMs running on the same table
227          * at the same time.
228          */
229
230         /*
231          * The strangeness with committing and starting transactions here is
232          * due to wanting to run each table's VACUUM as a separate
233          * transaction, so that we don't hold locks unnecessarily long.  Also,
234          * if we are doing VACUUM ANALYZE, the ANALYZE part runs as a separate
235          * transaction from the VACUUM to further reduce locking.
236          *
237          * vacuum_rel expects to be entered with no transaction active; it will
238          * start and commit its own transaction.  But we are called by an SQL
239          * command, and so we are executing inside a transaction already.  We
240          * commit the transaction started in PostgresMain() here, and start
241          * another one before exiting to match the commit waiting for us back
242          * in PostgresMain().
243          *
244          * In the case of an ANALYZE statement (no vacuum, just analyze) it's
245          * okay to run the whole thing in the outer transaction, and so we
246          * skip transaction start/stop operations.
247          */
248         if (vacstmt->vacuum)
249         {
250                 if (all_rels)
251                 {
252                         /*
253                          * It's a database-wide VACUUM.
254                          *
255                          * Compute the initially applicable OldestXmin and FreezeLimit
256                          * XIDs, so that we can record these values at the end of the
257                          * VACUUM. Note that individual tables may well be processed
258                          * with newer values, but we can guarantee that no
259                          * (non-shared) relations are processed with older ones.
260                          *
261                          * It is okay to record non-shared values in pg_database, even
262                          * though we may vacuum shared relations with older cutoffs,
263                          * because only the minimum of the values present in
264                          * pg_database matters.  We can be sure that shared relations
265                          * have at some time been vacuumed with cutoffs no worse than
266                          * the global minimum; for, if there is a backend in some
267                          * other DB with xmin = OLDXMIN that's determining the cutoff
268                          * with which we vacuum shared relations, it is not possible
269                          * for that database to have a cutoff newer than OLDXMIN
270                          * recorded in pg_database.
271                          */
272                         vacuum_set_xid_limits(vacstmt, false,
273                                                                   &initialOldestXmin,
274                                                                   &initialFreezeLimit);
275                 }
276
277                 /* matches the StartTransaction in PostgresMain() */
278                 CommitTransactionCommand(true);
279         }
280
281         /*
282          * Loop to process each selected relation.
283          */
284         foreach(cur, vrl)
285         {
286                 Oid                     relid = lfirsto(cur);
287
288                 if (vacstmt->vacuum)
289                 {
290                         if (! vacuum_rel(relid, vacstmt, RELKIND_RELATION))
291                                 all_rels = false; /* forget about updating dbstats */
292                 }
293                 if (vacstmt->analyze)
294                 {
295                         MemoryContext old_context = NULL;
296
297                         /*
298                          * If we vacuumed, use new transaction for analyze.
299                          * Otherwise, we can use the outer transaction, but we still
300                          * need to call analyze_rel in a memory context that will be
301                          * cleaned up on return (else we leak memory while processing
302                          * multiple tables).
303                          */
304                         if (vacstmt->vacuum)
305                         {
306                                 StartTransactionCommand(true);
307                                 SetQuerySnapshot();     /* might be needed for functional index */
308                         }
309                         else
310                                 old_context = MemoryContextSwitchTo(anl_context);
311
312                         analyze_rel(relid, vacstmt);
313
314                         if (vacstmt->vacuum)
315                                 CommitTransactionCommand(true);
316                         else
317                         {
318                                 MemoryContextSwitchTo(old_context);
319                                 MemoryContextResetAndDeleteChildren(anl_context);
320                         }
321                 }
322         }
323
324         /*
325          * Finish up processing.
326          */
327         if (vacstmt->vacuum)
328         {
329                 /* here, we are not in a transaction */
330
331                 /*
332                  * This matches the CommitTransaction waiting for us in
333                  * PostgresMain(). We tell xact.c not to chain the upcoming
334                  * commit, so that a VACUUM doesn't start a transaction block,
335                  * even when autocommit is off.
336                  */
337                 StartTransactionCommand(true);
338
339                 /*
340                  * If it was a database-wide VACUUM, print FSM usage statistics
341                  * (we don't make you be superuser to see these).
342                  */
343                 if (vacstmt->relation == NULL)
344                         PrintFreeSpaceMapStatistics(elevel);
345
346                 /*
347                  * If we completed a database-wide VACUUM without skipping any
348                  * relations, update the database's pg_database row with info
349                  * about the transaction IDs used, and try to truncate pg_clog.
350                  */
351                 if (all_rels)
352                 {
353                         vac_update_dbstats(MyDatabaseId,
354                                                            initialOldestXmin, initialFreezeLimit);
355                         vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
356                 }
357         }
358
359         /*
360          * Clean up working storage --- note we must do this after
361          * StartTransactionCommand, else we might be trying to delete the
362          * active context!
363          */
364         MemoryContextDelete(vac_context);
365         vac_context = NULL;
366
367         if (anl_context)
368                 MemoryContextDelete(anl_context);
369 }
370
371 /*
372  * Build a list of Oids for each relation to be processed
373  *
374  * The list is built in vac_context so that it will survive across our
375  * per-relation transactions.
376  */
377 static List *
378 getrels(const RangeVar *vacrel, const char *stmttype)
379 {
380         List       *vrl = NIL;
381         MemoryContext oldcontext;
382
383         if (vacrel)
384         {
385                 /* Process specific relation */
386                 Oid                     relid;
387
388                 relid = RangeVarGetRelid(vacrel, false);
389
390                 /* Make a relation list entry for this guy */
391                 oldcontext = MemoryContextSwitchTo(vac_context);
392                 vrl = lappendo(vrl, relid);
393                 MemoryContextSwitchTo(oldcontext);
394         }
395         else
396         {
397                 /* Process all plain relations listed in pg_class */
398                 Relation        pgclass;
399                 HeapScanDesc scan;
400                 HeapTuple       tuple;
401                 ScanKeyData key;
402
403                 ScanKeyEntryInitialize(&key, 0x0,
404                                                            Anum_pg_class_relkind,
405                                                            F_CHAREQ,
406                                                            CharGetDatum(RELKIND_RELATION));
407
408                 pgclass = heap_openr(RelationRelationName, AccessShareLock);
409
410                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
411
412                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
413                 {
414                         /* Make a relation list entry for this guy */
415                         oldcontext = MemoryContextSwitchTo(vac_context);
416                         vrl = lappendo(vrl, HeapTupleGetOid(tuple));
417                         MemoryContextSwitchTo(oldcontext);
418                 }
419
420                 heap_endscan(scan);
421                 heap_close(pgclass, AccessShareLock);
422         }
423
424         return vrl;
425 }
426
427 /*
428  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
429  */
430 void
431 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
432                                           TransactionId *oldestXmin,
433                                           TransactionId *freezeLimit)
434 {
435         TransactionId limit;
436
437         *oldestXmin = GetOldestXmin(sharedRel);
438
439         Assert(TransactionIdIsNormal(*oldestXmin));
440
441         if (vacstmt->freeze)
442         {
443                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
444                 limit = *oldestXmin;
445         }
446         else
447         {
448                 /*
449                  * Normal case: freeze cutoff is well in the past, to wit, about
450                  * halfway to the wrap horizon
451                  */
452                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
453         }
454
455         /*
456          * Be careful not to generate a "permanent" XID
457          */
458         if (!TransactionIdIsNormal(limit))
459                 limit = FirstNormalTransactionId;
460
461         /*
462          * Ensure sane relationship of limits
463          */
464         if (TransactionIdFollows(limit, *oldestXmin))
465         {
466                 elog(WARNING, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
467                 limit = *oldestXmin;
468         }
469
470         *freezeLimit = limit;
471 }
472
473
474 /*
475  *      vac_update_relstats() -- update statistics for one relation
476  *
477  *              Update the whole-relation statistics that are kept in its pg_class
478  *              row.  There are additional stats that will be updated if we are
479  *              doing ANALYZE, but we always update these stats.  This routine works
480  *              for both index and heap relation entries in pg_class.
481  *
482  *              We violate no-overwrite semantics here by storing new values for the
483  *              statistics columns directly into the pg_class tuple that's already on
484  *              the page.  The reason for this is that if we updated these tuples in
485  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
486  *              by the time we got done with a vacuum cycle, most of the tuples in
487  *              pg_class would've been obsoleted.  Of course, this only works for
488  *              fixed-size never-null columns, but these are.
489  *
490  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
491  *              ANALYZE.
492  */
493 void
494 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
495                                         bool hasindex)
496 {
497         Relation        rd;
498         HeapTupleData rtup;
499         HeapTuple       ctup;
500         Form_pg_class pgcform;
501         Buffer          buffer;
502
503         /*
504          * update number of tuples and number of pages in pg_class
505          */
506         rd = heap_openr(RelationRelationName, RowExclusiveLock);
507
508         ctup = SearchSysCache(RELOID,
509                                                   ObjectIdGetDatum(relid),
510                                                   0, 0, 0);
511         if (!HeapTupleIsValid(ctup))
512                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
513                          relid);
514
515         /* get the buffer cache tuple */
516         rtup.t_self = ctup->t_self;
517         ReleaseSysCache(ctup);
518         if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
519                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
520                          relid);
521
522         /* overwrite the existing statistics in the tuple */
523         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
524         pgcform->relpages = (int32) num_pages;
525         pgcform->reltuples = num_tuples;
526         pgcform->relhasindex = hasindex;
527
528         /*
529          * If we have discovered that there are no indexes, then there's no
530          * primary key either.  This could be done more thoroughly...
531          */
532         if (!hasindex)
533                 pgcform->relhaspkey = false;
534
535         /*
536          * Invalidate the tuple in the catcaches; this also arranges to flush
537          * the relation's relcache entry.  (If we fail to commit for some
538          * reason, no flush will occur, but no great harm is done since there
539          * are no noncritical state updates here.)
540          */
541         CacheInvalidateHeapTuple(rd, &rtup);
542
543         /* Write the buffer */
544         WriteBuffer(buffer);
545
546         heap_close(rd, RowExclusiveLock);
547 }
548
549
550 /*
551  *      vac_update_dbstats() -- update statistics for one database
552  *
553  *              Update the whole-database statistics that are kept in its pg_database
554  *              row.
555  *
556  *              We violate no-overwrite semantics here by storing new values for the
557  *              statistics columns directly into the tuple that's already on the page.
558  *              As with vac_update_relstats, this avoids leaving dead tuples behind
559  *              after a VACUUM; which is good since GetRawDatabaseInfo
560  *              can get confused by finding dead tuples in pg_database.
561  *
562  *              This routine is shared by full and lazy VACUUM.  Note that it is only
563  *              applied after a database-wide VACUUM operation.
564  */
565 static void
566 vac_update_dbstats(Oid dbid,
567                                    TransactionId vacuumXID,
568                                    TransactionId frozenXID)
569 {
570         Relation        relation;
571         ScanKeyData entry[1];
572         HeapScanDesc scan;
573         HeapTuple       tuple;
574         Form_pg_database dbform;
575
576         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
577
578         /* Must use a heap scan, since there's no syscache for pg_database */
579         ScanKeyEntryInitialize(&entry[0], 0x0,
580                                                    ObjectIdAttributeNumber, F_OIDEQ,
581                                                    ObjectIdGetDatum(dbid));
582
583         scan = heap_beginscan(relation, SnapshotNow, 1, entry);
584
585         tuple = heap_getnext(scan, ForwardScanDirection);
586
587         if (!HeapTupleIsValid(tuple))
588                 elog(ERROR, "database %u does not exist", dbid);
589
590         dbform = (Form_pg_database) GETSTRUCT(tuple);
591
592         /* overwrite the existing statistics in the tuple */
593         dbform->datvacuumxid = vacuumXID;
594         dbform->datfrozenxid = frozenXID;
595
596         /* invalidate the tuple in the cache and write the buffer */
597         CacheInvalidateHeapTuple(relation, tuple);
598         WriteNoReleaseBuffer(scan->rs_cbuf);
599
600         heap_endscan(scan);
601
602         heap_close(relation, RowExclusiveLock);
603 }
604
605
606 /*
607  *      vac_truncate_clog() -- attempt to truncate the commit log
608  *
609  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
610  *              and use it to truncate the transaction commit log (pg_clog).
611  *              Also generate a warning if the system-wide oldest datfrozenxid
612  *              seems to be in danger of wrapping around.
613  *
614  *              The passed XIDs are simply the ones I just wrote into my pg_database
615  *              entry.  They're used to initialize the "min" calculations.
616  *
617  *              This routine is shared by full and lazy VACUUM.  Note that it is only
618  *              applied after a database-wide VACUUM operation.
619  */
620 static void
621 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
622 {
623         TransactionId myXID;
624         Relation        relation;
625         HeapScanDesc scan;
626         HeapTuple       tuple;
627         int32           age;
628         bool            vacuumAlreadyWrapped = false;
629         bool            frozenAlreadyWrapped = false;
630
631         myXID = GetCurrentTransactionId();
632
633         relation = heap_openr(DatabaseRelationName, AccessShareLock);
634
635         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
636
637         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
638         {
639                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
640
641                 /* Ignore non-connectable databases (eg, template0) */
642                 /* It's assumed that these have been frozen correctly */
643                 if (!dbform->datallowconn)
644                         continue;
645
646                 if (TransactionIdIsNormal(dbform->datvacuumxid))
647                 {
648                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
649                                 vacuumAlreadyWrapped = true;
650                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
651                                 vacuumXID = dbform->datvacuumxid;
652                 }
653                 if (TransactionIdIsNormal(dbform->datfrozenxid))
654                 {
655                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
656                                 frozenAlreadyWrapped = true;
657                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
658                                 frozenXID = dbform->datfrozenxid;
659                 }
660         }
661
662         heap_endscan(scan);
663
664         heap_close(relation, AccessShareLock);
665
666         /*
667          * Do not truncate CLOG if we seem to have suffered wraparound
668          * already; the computed minimum XID might be bogus.
669          */
670         if (vacuumAlreadyWrapped)
671         {
672                 elog(WARNING, "Some databases have not been vacuumed in over 2 billion transactions."
673                          "\n\tYou may have already suffered transaction-wraparound data loss.");
674                 return;
675         }
676
677         /* Truncate CLOG to the oldest vacuumxid */
678         TruncateCLOG(vacuumXID);
679
680         /* Give warning about impending wraparound problems */
681         if (frozenAlreadyWrapped)
682         {
683                 elog(WARNING, "Some databases have not been vacuumed in over 1 billion transactions."
684                          "\n\tBetter vacuum them soon, or you may have a wraparound failure.");
685         }
686         else
687         {
688                 age = (int32) (myXID - frozenXID);
689                 if (age > (int32) ((MaxTransactionId >> 3) * 3))
690                         elog(WARNING, "Some databases have not been vacuumed in %d transactions."
691                                  "\n\tBetter vacuum them within %d transactions,"
692                                  "\n\tor you may have a wraparound failure.",
693                                  age, (int32) (MaxTransactionId >> 1) - age);
694         }
695 }
696
697
698 /****************************************************************************
699  *                                                                                                                                                      *
700  *                      Code common to both flavors of VACUUM                                                   *
701  *                                                                                                                                                      *
702  ****************************************************************************
703  */
704
705
706 /*
707  *      vacuum_rel() -- vacuum one heap relation
708  *
709  *              Returns TRUE if we actually processed the relation (or can ignore it
710  *              for some reason), FALSE if we failed to process it due to permissions
711  *              or other reasons.  (A FALSE result really means that some data
712  *              may have been left unvacuumed, so we can't update XID stats.)
713  *
714  *              Doing one heap at a time incurs extra overhead, since we need to
715  *              check that the heap exists again just before we vacuum it.      The
716  *              reason that we do this is so that vacuuming can be spread across
717  *              many small transactions.  Otherwise, two-phase locking would require
718  *              us to lock the entire database during one pass of the vacuum cleaner.
719  *
720  *              At entry and exit, we are not inside a transaction.
721  */
722 static bool
723 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
724 {
725         LOCKMODE        lmode;
726         Relation        onerel;
727         LockRelId       onerelid;
728         Oid                     toast_relid;
729         bool            result;
730
731         /* Begin a transaction for vacuuming this relation */
732         StartTransactionCommand(true);
733         SetQuerySnapshot();                     /* might be needed for functional index */
734
735         /*
736          * Check for user-requested abort.      Note we want this to be inside a
737          * transaction, so xact.c doesn't issue useless WARNING.
738          */
739         CHECK_FOR_INTERRUPTS();
740
741         /*
742          * Race condition -- if the pg_class tuple has gone away since the
743          * last time we saw it, we don't need to vacuum it.
744          */
745         if (!SearchSysCacheExists(RELOID,
746                                                           ObjectIdGetDatum(relid),
747                                                           0, 0, 0))
748         {
749                 CommitTransactionCommand(true);
750                 return true;                    /* okay 'cause no data there */
751         }
752
753         /*
754          * Determine the type of lock we want --- hard exclusive lock for a
755          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
756          * vacuum.      Either way, we can be sure that no other backend is
757          * vacuuming the same table.
758          */
759         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
760
761         /*
762          * Open the class, get an appropriate lock on it, and check
763          * permissions.
764          *
765          * We allow the user to vacuum a table if he is superuser, the table
766          * owner, or the database owner (but in the latter case, only if it's
767          * not a shared relation).      pg_class_ownercheck includes the superuser
768          * case.
769          *
770          * Note we choose to treat permissions failure as a WARNING and keep
771          * trying to vacuum the rest of the DB --- is this appropriate?
772          */
773         onerel = relation_open(relid, lmode);
774
775         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
776                   (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
777         {
778                 elog(WARNING, "Skipping \"%s\" --- only table or database owner can VACUUM it",
779                          RelationGetRelationName(onerel));
780                 relation_close(onerel, lmode);
781                 CommitTransactionCommand(true);
782                 return false;
783         }
784
785         /*
786          * Check that it's a plain table; we used to do this in getrels() but
787          * seems safer to check after we've locked the relation.
788          */
789         if (onerel->rd_rel->relkind != expected_relkind)
790         {
791                 elog(WARNING, "Skipping \"%s\" --- can not process indexes, views or special system tables",
792                          RelationGetRelationName(onerel));
793                 relation_close(onerel, lmode);
794                 CommitTransactionCommand(true);
795                 return false;
796         }
797
798         /*
799          * Silently ignore tables that are temp tables of other backends ---
800          * trying to vacuum these will lead to great unhappiness, since their
801          * contents are probably not up-to-date on disk.  (We don't throw a
802          * warning here; it would just lead to chatter during a database-wide
803          * VACUUM.)
804          */
805         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
806         {
807                 relation_close(onerel, lmode);
808                 CommitTransactionCommand(true);
809                 return true;                    /* assume no long-lived data in temp tables */
810         }
811
812         /*
813          * Get a session-level lock too. This will protect our access to the
814          * relation across multiple transactions, so that we can vacuum the
815          * relation's TOAST table (if any) secure in the knowledge that no one
816          * is deleting the parent relation.
817          *
818          * NOTE: this cannot block, even if someone else is waiting for access,
819          * because the lock manager knows that both lock requests are from the
820          * same process.
821          */
822         onerelid = onerel->rd_lockInfo.lockRelId;
823         LockRelationForSession(&onerelid, lmode);
824
825         /*
826          * Remember the relation's TOAST relation for later
827          */
828         toast_relid = onerel->rd_rel->reltoastrelid;
829
830         /*
831          * Do the actual work --- either FULL or "lazy" vacuum
832          */
833         if (vacstmt->full)
834                 full_vacuum_rel(onerel, vacstmt);
835         else
836                 lazy_vacuum_rel(onerel, vacstmt);
837
838         result = true;                          /* did the vacuum */
839
840         /* all done with this class, but hold lock until commit */
841         relation_close(onerel, NoLock);
842
843         /*
844          * Complete the transaction and free all temporary memory used.
845          */
846         CommitTransactionCommand(true);
847
848         /*
849          * If the relation has a secondary toast rel, vacuum that too while we
850          * still hold the session lock on the master table.  Note however that
851          * "analyze" will not get done on the toast table.      This is good,
852          * because the toaster always uses hardcoded index access and
853          * statistics are totally unimportant for toast relations.
854          */
855         if (toast_relid != InvalidOid)
856         {
857                 if (! vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
858                         result = false;         /* failed to vacuum the TOAST table? */
859         }
860
861         /*
862          * Now release the session-level lock on the master table.
863          */
864         UnlockRelationForSession(&onerelid, lmode);
865
866         return result;
867 }
868
869
870 /****************************************************************************
871  *                                                                                                                                                      *
872  *                      Code for VACUUM FULL (only)                                                                             *
873  *                                                                                                                                                      *
874  ****************************************************************************
875  */
876
877
878 /*
879  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
880  *
881  *              This routine vacuums a single heap, cleans out its indexes, and
882  *              updates its num_pages and num_tuples statistics.
883  *
884  *              At entry, we have already established a transaction and opened
885  *              and locked the relation.
886  */
887 static void
888 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
889 {
890         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
891                                                                                  * clean indexes */
892         VacPageListData fraged_pages;           /* List of pages with space enough
893                                                                                  * for re-using */
894         Relation   *Irel;
895         int                     nindexes,
896                                 i;
897         VRelStats  *vacrelstats;
898         bool            reindex = false;
899
900         if (IsIgnoringSystemIndexes() &&
901                 IsSystemRelation(onerel))
902                 reindex = true;
903
904         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
905                                                   &OldestXmin, &FreezeLimit);
906
907         /*
908          * Set up statistics-gathering machinery.
909          */
910         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
911         vacrelstats->rel_pages = 0;
912         vacrelstats->rel_tuples = 0;
913         vacrelstats->hasindex = false;
914
915         /* scan the heap */
916         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
917         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
918
919         /* Now open all indexes of the relation */
920         vac_open_indexes(onerel, &nindexes, &Irel);
921         if (!Irel)
922                 reindex = false;
923         else if (!RelationGetForm(onerel)->relhasindex)
924                 reindex = true;
925         if (nindexes > 0)
926                 vacrelstats->hasindex = true;
927
928 #ifdef NOT_USED
929
930         /*
931          * reindex in VACUUM is dangerous under WAL. ifdef out until it
932          * becomes safe.
933          */
934         if (reindex)
935         {
936                 vac_close_indexes(nindexes, Irel);
937                 Irel = (Relation *) NULL;
938                 activate_indexes_of_a_table(onerel, false);
939         }
940 #endif   /* NOT_USED */
941
942         /* Clean/scan index relation(s) */
943         if (Irel != (Relation *) NULL)
944         {
945                 if (vacuum_pages.num_pages > 0)
946                 {
947                         for (i = 0; i < nindexes; i++)
948                                 vacuum_index(&vacuum_pages, Irel[i],
949                                                          vacrelstats->rel_tuples, 0);
950                 }
951                 else
952                 {
953                         /* just scan indexes to update statistic */
954                         for (i = 0; i < nindexes; i++)
955                                 scan_index(Irel[i], vacrelstats->rel_tuples);
956                 }
957         }
958
959         if (fraged_pages.num_pages > 0)
960         {
961                 /* Try to shrink heap */
962                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
963                                         nindexes, Irel);
964                 vac_close_indexes(nindexes, Irel);
965         }
966         else
967         {
968                 vac_close_indexes(nindexes, Irel);
969                 if (vacuum_pages.num_pages > 0)
970                 {
971                         /* Clean pages from vacuum_pages list */
972                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
973                 }
974                 else
975                 {
976                         /*
977                          * Flush dirty pages out to disk.  We must do this even if we
978                          * didn't do anything else, because we want to ensure that all
979                          * tuples have correct on-row commit status on disk (see
980                          * bufmgr.c's comments for FlushRelationBuffers()).
981                          */
982                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
983                         if (i < 0)
984                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
985                                          i);
986                 }
987         }
988
989 #ifdef NOT_USED
990         if (reindex)
991                 activate_indexes_of_a_table(onerel, true);
992 #endif   /* NOT_USED */
993
994         /* update shared free space map with final free space info */
995         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
996
997         /* update statistics in pg_class */
998         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
999                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
1000 }
1001
1002
1003 /*
1004  *      scan_heap() -- scan an open heap relation
1005  *
1006  *              This routine sets commit status bits, constructs vacuum_pages (list
1007  *              of pages we need to compact free space on and/or clean indexes of
1008  *              deleted tuples), constructs fraged_pages (list of pages with free
1009  *              space that tuples could be moved into), and calculates statistics
1010  *              on the number of live tuples in the heap.
1011  */
1012 static void
1013 scan_heap(VRelStats *vacrelstats, Relation onerel,
1014                   VacPageList vacuum_pages, VacPageList fraged_pages)
1015 {
1016         BlockNumber nblocks,
1017                                 blkno;
1018         ItemId          itemid;
1019         Buffer          buf;
1020         HeapTupleData tuple;
1021         OffsetNumber offnum,
1022                                 maxoff;
1023         bool            pgchanged,
1024                                 tupgone,
1025                                 notup;
1026         char       *relname;
1027         VacPage         vacpage,
1028                                 vacpagecopy;
1029         BlockNumber empty_pages,
1030                                 new_pages,
1031                                 changed_pages,
1032                                 empty_end_pages;
1033         double          num_tuples,
1034                                 tups_vacuumed,
1035                                 nkeep,
1036                                 nunused;
1037         double          free_size,
1038                                 usable_free_size;
1039         Size            min_tlen = MaxTupleSize;
1040         Size            max_tlen = 0;
1041         int                     i;
1042         bool            do_shrinking = true;
1043         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1044         int                     num_vtlinks = 0;
1045         int                     free_vtlinks = 100;
1046         VacRUsage       ru0;
1047
1048         vac_init_rusage(&ru0);
1049
1050         relname = RelationGetRelationName(onerel);
1051         elog(elevel, "--Relation %s.%s--",
1052                  get_namespace_name(RelationGetNamespace(onerel)),
1053                  relname);
1054
1055         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
1056         num_tuples = tups_vacuumed = nkeep = nunused = 0;
1057         free_size = 0;
1058
1059         nblocks = RelationGetNumberOfBlocks(onerel);
1060
1061         /*
1062          * We initially create each VacPage item in a maximal-sized workspace,
1063          * then copy the workspace into a just-large-enough copy.
1064          */
1065         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1066
1067         for (blkno = 0; blkno < nblocks; blkno++)
1068         {
1069                 Page            page,
1070                                         tempPage = NULL;
1071                 bool            do_reap,
1072                                         do_frag;
1073
1074                 CHECK_FOR_INTERRUPTS();
1075
1076                 buf = ReadBuffer(onerel, blkno);
1077                 page = BufferGetPage(buf);
1078
1079                 vacpage->blkno = blkno;
1080                 vacpage->offsets_used = 0;
1081                 vacpage->offsets_free = 0;
1082
1083                 if (PageIsNew(page))
1084                 {
1085                         elog(WARNING, "Rel %s: Uninitialized page %u - fixing",
1086                                  relname, blkno);
1087                         PageInit(page, BufferGetPageSize(buf), 0);
1088                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1089                         free_size += vacpage->free;
1090                         new_pages++;
1091                         empty_end_pages++;
1092                         vacpagecopy = copy_vac_page(vacpage);
1093                         vpage_insert(vacuum_pages, vacpagecopy);
1094                         vpage_insert(fraged_pages, vacpagecopy);
1095                         WriteBuffer(buf);
1096                         continue;
1097                 }
1098
1099                 if (PageIsEmpty(page))
1100                 {
1101                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1102                         free_size += vacpage->free;
1103                         empty_pages++;
1104                         empty_end_pages++;
1105                         vacpagecopy = copy_vac_page(vacpage);
1106                         vpage_insert(vacuum_pages, vacpagecopy);
1107                         vpage_insert(fraged_pages, vacpagecopy);
1108                         ReleaseBuffer(buf);
1109                         continue;
1110                 }
1111
1112                 pgchanged = false;
1113                 notup = true;
1114                 maxoff = PageGetMaxOffsetNumber(page);
1115                 for (offnum = FirstOffsetNumber;
1116                          offnum <= maxoff;
1117                          offnum = OffsetNumberNext(offnum))
1118                 {
1119                         uint16          sv_infomask;
1120
1121                         itemid = PageGetItemId(page, offnum);
1122
1123                         /*
1124                          * Collect un-used items too - it's possible to have indexes
1125                          * pointing here after crash.
1126                          */
1127                         if (!ItemIdIsUsed(itemid))
1128                         {
1129                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1130                                 nunused += 1;
1131                                 continue;
1132                         }
1133
1134                         tuple.t_datamcxt = NULL;
1135                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1136                         tuple.t_len = ItemIdGetLength(itemid);
1137                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1138
1139                         tupgone = false;
1140                         sv_infomask = tuple.t_data->t_infomask;
1141
1142                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1143                         {
1144                                 case HEAPTUPLE_DEAD:
1145                                         tupgone = true;         /* we can delete the tuple */
1146                                         break;
1147                                 case HEAPTUPLE_LIVE:
1148
1149                                         /*
1150                                          * Tuple is good.  Consider whether to replace its
1151                                          * xmin value with FrozenTransactionId.
1152                                          */
1153                                         if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
1154                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1155                                                                                           FreezeLimit))
1156                                         {
1157                                                 HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
1158                                                 /* infomask should be okay already */
1159                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1160                                                 pgchanged = true;
1161                                         }
1162                                         break;
1163                                 case HEAPTUPLE_RECENTLY_DEAD:
1164
1165                                         /*
1166                                          * If tuple is recently deleted then we must not
1167                                          * remove it from relation.
1168                                          */
1169                                         nkeep += 1;
1170
1171                                         /*
1172                                          * If we do shrinking and this tuple is updated one
1173                                          * then remember it to construct updated tuple
1174                                          * dependencies.
1175                                          */
1176                                         if (do_shrinking &&
1177                                                 !(ItemPointerEquals(&(tuple.t_self),
1178                                                                                         &(tuple.t_data->t_ctid))))
1179                                         {
1180                                                 if (free_vtlinks == 0)
1181                                                 {
1182                                                         free_vtlinks = 1000;
1183                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1184                                                                                    (free_vtlinks + num_vtlinks) *
1185                                                                                                  sizeof(VTupleLinkData));
1186                                                 }
1187                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1188                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1189                                                 free_vtlinks--;
1190                                                 num_vtlinks++;
1191                                         }
1192                                         break;
1193                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1194
1195                                         /*
1196                                          * This should not happen, since we hold exclusive
1197                                          * lock on the relation; shouldn't we raise an error?
1198                                          */
1199                                         elog(WARNING, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
1200                                                  relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data));
1201                                         do_shrinking = false;
1202                                         break;
1203                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1204
1205                                         /*
1206                                          * This should not happen, since we hold exclusive
1207                                          * lock on the relation; shouldn't we raise an error?
1208                                          */
1209                                         elog(WARNING, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
1210                                                  relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data));
1211                                         do_shrinking = false;
1212                                         break;
1213                                 default:
1214                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
1215                                         break;
1216                         }
1217
1218                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1219                         if (sv_infomask != tuple.t_data->t_infomask)
1220                                 pgchanged = true;
1221
1222                         /*
1223                          * Other checks...
1224                          */
1225                         if (onerel->rd_rel->relhasoids &&
1226                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1227                                 elog(WARNING, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
1228                                          relname, blkno, offnum, (int) tupgone);
1229
1230                         if (tupgone)
1231                         {
1232                                 ItemId          lpp;
1233
1234                                 /*
1235                                  * Here we are building a temporary copy of the page with
1236                                  * dead tuples removed.  Below we will apply
1237                                  * PageRepairFragmentation to the copy, so that we can
1238                                  * determine how much space will be available after
1239                                  * removal of dead tuples.      But note we are NOT changing
1240                                  * the real page yet...
1241                                  */
1242                                 if (tempPage == (Page) NULL)
1243                                 {
1244                                         Size            pageSize;
1245
1246                                         pageSize = PageGetPageSize(page);
1247                                         tempPage = (Page) palloc(pageSize);
1248                                         memcpy(tempPage, page, pageSize);
1249                                 }
1250
1251                                 /* mark it unused on the temp page */
1252                                 lpp = PageGetItemId(tempPage, offnum);
1253                                 lpp->lp_flags &= ~LP_USED;
1254
1255                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1256                                 tups_vacuumed += 1;
1257                         }
1258                         else
1259                         {
1260                                 num_tuples += 1;
1261                                 notup = false;
1262                                 if (tuple.t_len < min_tlen)
1263                                         min_tlen = tuple.t_len;
1264                                 if (tuple.t_len > max_tlen)
1265                                         max_tlen = tuple.t_len;
1266                         }
1267                 }                                               /* scan along page */
1268
1269                 if (tempPage != (Page) NULL)
1270                 {
1271                         /* Some tuples are removable; figure free space after removal */
1272                         PageRepairFragmentation(tempPage, NULL);
1273                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1274                         pfree(tempPage);
1275                         do_reap = true;
1276                 }
1277                 else
1278                 {
1279                         /* Just use current available space */
1280                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1281                         /* Need to reap the page if it has ~LP_USED line pointers */
1282                         do_reap = (vacpage->offsets_free > 0);
1283                 }
1284
1285                 free_size += vacpage->free;
1286
1287                 /*
1288                  * Add the page to fraged_pages if it has a useful amount of free
1289                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1290                  * don't know that accurately near the start of the relation, so
1291                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1292                  */
1293                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1294
1295                 if (do_reap || do_frag)
1296                 {
1297                         vacpagecopy = copy_vac_page(vacpage);
1298                         if (do_reap)
1299                                 vpage_insert(vacuum_pages, vacpagecopy);
1300                         if (do_frag)
1301                                 vpage_insert(fraged_pages, vacpagecopy);
1302                 }
1303
1304                 if (notup)
1305                         empty_end_pages++;
1306                 else
1307                         empty_end_pages = 0;
1308
1309                 if (pgchanged)
1310                 {
1311                         WriteBuffer(buf);
1312                         changed_pages++;
1313                 }
1314                 else
1315                         ReleaseBuffer(buf);
1316         }
1317
1318         pfree(vacpage);
1319
1320         /* save stats in the rel list for use later */
1321         vacrelstats->rel_tuples = num_tuples;
1322         vacrelstats->rel_pages = nblocks;
1323         if (num_tuples == 0)
1324                 min_tlen = max_tlen = 0;
1325         vacrelstats->min_tlen = min_tlen;
1326         vacrelstats->max_tlen = max_tlen;
1327
1328         vacuum_pages->empty_end_pages = empty_end_pages;
1329         fraged_pages->empty_end_pages = empty_end_pages;
1330
1331         /*
1332          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1333          * remove any "empty" end-pages from the list, and compute usable free
1334          * space = free space in remaining pages.
1335          */
1336         if (do_shrinking)
1337         {
1338                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1339                 fraged_pages->num_pages -= empty_end_pages;
1340                 usable_free_size = 0;
1341                 for (i = 0; i < fraged_pages->num_pages; i++)
1342                         usable_free_size += fraged_pages->pagedesc[i]->free;
1343         }
1344         else
1345         {
1346                 fraged_pages->num_pages = 0;
1347                 usable_free_size = 0;
1348         }
1349
1350         /* don't bother to save vtlinks if we will not call repair_frag */
1351         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1352         {
1353                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1354                           vac_cmp_vtlinks);
1355                 vacrelstats->vtlinks = vtlinks;
1356                 vacrelstats->num_vtlinks = num_vtlinks;
1357         }
1358         else
1359         {
1360                 vacrelstats->vtlinks = NULL;
1361                 vacrelstats->num_vtlinks = 0;
1362                 pfree(vtlinks);
1363         }
1364
1365         elog(elevel, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; "
1366                  "Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, "
1367                  "MaxLen %lu; Re-using: Free/Avail. Space %.0f/%.0f; "
1368                  "EndEmpty/Avail. Pages %u/%u.\n\t%s",
1369                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1370                  new_pages, num_tuples, tups_vacuumed,
1371                  nkeep, vacrelstats->num_vtlinks,
1372                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1373                  free_size, usable_free_size,
1374                  empty_end_pages, fraged_pages->num_pages,
1375                  vac_show_rusage(&ru0));
1376 }
1377
1378
1379 /*
1380  *      repair_frag() -- try to repair relation's fragmentation
1381  *
1382  *              This routine marks dead tuples as unused and tries re-use dead space
1383  *              by moving tuples (and inserting indexes if needed). It constructs
1384  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1385  *              for them after committing (in hack-manner - without losing locks
1386  *              and freeing memory!) current transaction. It truncates relation
1387  *              if some end-blocks are gone away.
1388  */
1389 static void
1390 repair_frag(VRelStats *vacrelstats, Relation onerel,
1391                         VacPageList vacuum_pages, VacPageList fraged_pages,
1392                         int nindexes, Relation *Irel)
1393 {
1394         TransactionId myXID;
1395         CommandId       myCID;
1396         Buffer          buf,
1397                                 cur_buffer;
1398         BlockNumber nblocks,
1399                                 blkno;
1400         BlockNumber last_move_dest_block = 0,
1401                                 last_vacuum_block;
1402         Page            page,
1403                                 ToPage = NULL;
1404         OffsetNumber offnum,
1405                                 maxoff,
1406                                 newoff,
1407                                 max_offset;
1408         ItemId          itemid,
1409                                 newitemid;
1410         HeapTupleData tuple,
1411                                 newtup;
1412         TupleDesc       tupdesc;
1413         ResultRelInfo *resultRelInfo;
1414         EState     *estate;
1415         TupleTable      tupleTable;
1416         TupleTableSlot *slot;
1417         VacPageListData Nvacpagelist;
1418         VacPage         cur_page = NULL,
1419                                 last_vacuum_page,
1420                                 vacpage,
1421                            *curpage;
1422         int                     cur_item = 0;
1423         int                     i;
1424         Size            tuple_len;
1425         int                     num_moved,
1426                                 num_fraged_pages,
1427                                 vacuumed_pages;
1428         int                     checked_moved,
1429                                 num_tuples,
1430                                 keep_tuples = 0;
1431         bool            isempty,
1432                                 dowrite,
1433                                 chain_tuple_moved;
1434         VacRUsage       ru0;
1435
1436         vac_init_rusage(&ru0);
1437
1438         myXID = GetCurrentTransactionId();
1439         myCID = GetCurrentCommandId();
1440
1441         tupdesc = RelationGetDescr(onerel);
1442
1443         /*
1444          * We need a ResultRelInfo and an EState so we can use the regular
1445          * executor's index-entry-making machinery.
1446          */
1447         estate = CreateExecutorState();
1448
1449         resultRelInfo = makeNode(ResultRelInfo);
1450         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1451         resultRelInfo->ri_RelationDesc = onerel;
1452         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1453
1454         ExecOpenIndices(resultRelInfo);
1455
1456         estate->es_result_relations = resultRelInfo;
1457         estate->es_num_result_relations = 1;
1458         estate->es_result_relation_info = resultRelInfo;
1459
1460         /* Set up a dummy tuple table too */
1461         tupleTable = ExecCreateTupleTable(1);
1462         slot = ExecAllocTableSlot(tupleTable);
1463         ExecSetSlotDescriptor(slot, tupdesc, false);
1464
1465         Nvacpagelist.num_pages = 0;
1466         num_fraged_pages = fraged_pages->num_pages;
1467         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1468         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1469         if (vacuumed_pages > 0)
1470         {
1471                 /* get last reaped page from vacuum_pages */
1472                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1473                 last_vacuum_block = last_vacuum_page->blkno;
1474         }
1475         else
1476         {
1477                 last_vacuum_page = NULL;
1478                 last_vacuum_block = InvalidBlockNumber;
1479         }
1480         cur_buffer = InvalidBuffer;
1481         num_moved = 0;
1482
1483         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1484         vacpage->offsets_used = vacpage->offsets_free = 0;
1485
1486         /*
1487          * Scan pages backwards from the last nonempty page, trying to move
1488          * tuples down to lower pages.  Quit when we reach a page that we have
1489          * moved any tuples onto, or the first page if we haven't moved
1490          * anything, or when we find a page we cannot completely empty (this
1491          * last condition is handled by "break" statements within the loop).
1492          *
1493          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1494          * in order by blkno.
1495          */
1496         nblocks = vacrelstats->rel_pages;
1497         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1498                  blkno > last_move_dest_block;
1499                  blkno--)
1500         {
1501                 CHECK_FOR_INTERRUPTS();
1502
1503                 /*
1504                  * Forget fraged_pages pages at or after this one; they're no
1505                  * longer useful as move targets, since we only want to move down.
1506                  * Note that since we stop the outer loop at last_move_dest_block,
1507                  * pages removed here cannot have had anything moved onto them
1508                  * already.
1509                  *
1510                  * Also note that we don't change the stored fraged_pages list, only
1511                  * our local variable num_fraged_pages; so the forgotten pages are
1512                  * still available to be loaded into the free space map later.
1513                  */
1514                 while (num_fraged_pages > 0 &&
1515                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1516                 {
1517                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1518                         --num_fraged_pages;
1519                 }
1520
1521                 /*
1522                  * Process this page of relation.
1523                  */
1524                 buf = ReadBuffer(onerel, blkno);
1525                 page = BufferGetPage(buf);
1526
1527                 vacpage->offsets_free = 0;
1528
1529                 isempty = PageIsEmpty(page);
1530
1531                 dowrite = false;
1532
1533                 /* Is the page in the vacuum_pages list? */
1534                 if (blkno == last_vacuum_block)
1535                 {
1536                         if (last_vacuum_page->offsets_free > 0)
1537                         {
1538                                 /* there are dead tuples on this page - clean them */
1539                                 Assert(!isempty);
1540                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1541                                 vacuum_page(onerel, buf, last_vacuum_page);
1542                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1543                                 dowrite = true;
1544                         }
1545                         else
1546                                 Assert(isempty);
1547                         --vacuumed_pages;
1548                         if (vacuumed_pages > 0)
1549                         {
1550                                 /* get prev reaped page from vacuum_pages */
1551                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1552                                 last_vacuum_block = last_vacuum_page->blkno;
1553                         }
1554                         else
1555                         {
1556                                 last_vacuum_page = NULL;
1557                                 last_vacuum_block = InvalidBlockNumber;
1558                         }
1559                         if (isempty)
1560                         {
1561                                 ReleaseBuffer(buf);
1562                                 continue;
1563                         }
1564                 }
1565                 else
1566                         Assert(!isempty);
1567
1568                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1569                                                                                  * off this page, yet */
1570                 vacpage->blkno = blkno;
1571                 maxoff = PageGetMaxOffsetNumber(page);
1572                 for (offnum = FirstOffsetNumber;
1573                          offnum <= maxoff;
1574                          offnum = OffsetNumberNext(offnum))
1575                 {
1576                         itemid = PageGetItemId(page, offnum);
1577
1578                         if (!ItemIdIsUsed(itemid))
1579                                 continue;
1580
1581                         tuple.t_datamcxt = NULL;
1582                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1583                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1584                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1585
1586                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1587                         {
1588                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1589                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1590
1591                                 /*
1592                                  * If this (chain) tuple is moved by me already then I
1593                                  * have to check is it in vacpage or not - i.e. is it
1594                                  * moved while cleaning this page or some previous one.
1595                                  */
1596                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1597                                 {
1598                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1599                                                 elog(ERROR, "Invalid XVAC in tuple header");
1600                                         if (keep_tuples == 0)
1601                                                 continue;
1602                                         if (chain_tuple_moved)          /* some chains was moved
1603                                                                                                  * while */
1604                                         {                       /* cleaning this page */
1605                                                 Assert(vacpage->offsets_free > 0);
1606                                                 for (i = 0; i < vacpage->offsets_free; i++)
1607                                                 {
1608                                                         if (vacpage->offsets[i] == offnum)
1609                                                                 break;
1610                                                 }
1611                                                 if (i >= vacpage->offsets_free) /* not found */
1612                                                 {
1613                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1614                                                         keep_tuples--;
1615                                                 }
1616                                         }
1617                                         else
1618                                         {
1619                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1620                                                 keep_tuples--;
1621                                         }
1622                                         continue;
1623                                 }
1624                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1625                         }
1626
1627                         /*
1628                          * If this tuple is in the chain of tuples created in updates
1629                          * by "recent" transactions then we have to move all chain of
1630                          * tuples to another places.
1631                          *
1632                          * NOTE: this test is not 100% accurate: it is possible for a
1633                          * tuple to be an updated one with recent xmin, and yet not
1634                          * have a corresponding tuple in the vtlinks list.      Presumably
1635                          * there was once a parent tuple with xmax matching the xmin,
1636                          * but it's possible that that tuple has been removed --- for
1637                          * example, if it had xmin = xmax then
1638                          * HeapTupleSatisfiesVacuum would deem it removable as soon as
1639                          * the xmin xact completes.
1640                          *
1641                          * To be on the safe side, we abandon the repair_frag process if
1642                          * we cannot find the parent tuple in vtlinks.  This may be
1643                          * overly conservative; AFAICS it would be safe to move the
1644                          * chain.
1645                          */
1646                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
1647                          !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1648                                                                         OldestXmin)) ||
1649                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
1650                                                                                            HEAP_MARKED_FOR_UPDATE)) &&
1651                                  !(ItemPointerEquals(&(tuple.t_self),
1652                                                                          &(tuple.t_data->t_ctid)))))
1653                         {
1654                                 Buffer          Cbuf = buf;
1655                                 bool            freeCbuf = false;
1656                                 bool            chain_move_failed = false;
1657                                 Page            Cpage;
1658                                 ItemId          Citemid;
1659                                 ItemPointerData Ctid;
1660                                 HeapTupleData tp = tuple;
1661                                 Size            tlen = tuple_len;
1662                                 VTupleMove      vtmove;
1663                                 int                     num_vtmove;
1664                                 int                     free_vtmove;
1665                                 VacPage         to_vacpage = NULL;
1666                                 int                     to_item = 0;
1667                                 int                     ti;
1668
1669                                 if (cur_buffer != InvalidBuffer)
1670                                 {
1671                                         WriteBuffer(cur_buffer);
1672                                         cur_buffer = InvalidBuffer;
1673                                 }
1674
1675                                 /* Quick exit if we have no vtlinks to search in */
1676                                 if (vacrelstats->vtlinks == NULL)
1677                                 {
1678                                         elog(DEBUG1, "Parent item in update-chain not found - can't continue repair_frag");
1679                                         break;          /* out of walk-along-page loop */
1680                                 }
1681
1682                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
1683                                 num_vtmove = 0;
1684                                 free_vtmove = 100;
1685
1686                                 /*
1687                                  * If this tuple is in the begin/middle of the chain then
1688                                  * we have to move to the end of chain.
1689                                  */
1690                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
1691                                                                                           HEAP_MARKED_FOR_UPDATE)) &&
1692                                            !(ItemPointerEquals(&(tp.t_self),
1693                                                                                    &(tp.t_data->t_ctid))))
1694                                 {
1695                                         Ctid = tp.t_data->t_ctid;
1696                                         if (freeCbuf)
1697                                                 ReleaseBuffer(Cbuf);
1698                                         freeCbuf = true;
1699                                         Cbuf = ReadBuffer(onerel,
1700                                                                           ItemPointerGetBlockNumber(&Ctid));
1701                                         Cpage = BufferGetPage(Cbuf);
1702                                         Citemid = PageGetItemId(Cpage,
1703                                                                           ItemPointerGetOffsetNumber(&Ctid));
1704                                         if (!ItemIdIsUsed(Citemid))
1705                                         {
1706                                                 /*
1707                                                  * This means that in the middle of chain there
1708                                                  * was tuple updated by older (than OldestXmin)
1709                                                  * xaction and this tuple is already deleted by
1710                                                  * me. Actually, upper part of chain should be
1711                                                  * removed and seems that this should be handled
1712                                                  * in scan_heap(), but it's not implemented at the
1713                                                  * moment and so we just stop shrinking here.
1714                                                  */
1715                                                 elog(DEBUG1, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1716                                                 chain_move_failed = true;
1717                                                 break;  /* out of loop to move to chain end */
1718                                         }
1719                                         tp.t_datamcxt = NULL;
1720                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1721                                         tp.t_self = Ctid;
1722                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1723                                 }
1724                                 if (chain_move_failed)
1725                                 {
1726                                         if (freeCbuf)
1727                                                 ReleaseBuffer(Cbuf);
1728                                         pfree(vtmove);
1729                                         break;          /* out of walk-along-page loop */
1730                                 }
1731
1732                                 /*
1733                                  * Check if all items in chain can be moved
1734                                  */
1735                                 for (;;)
1736                                 {
1737                                         Buffer          Pbuf;
1738                                         Page            Ppage;
1739                                         ItemId          Pitemid;
1740                                         HeapTupleData Ptp;
1741                                         VTupleLinkData vtld,
1742                                                            *vtlp;
1743
1744                                         if (to_vacpage == NULL ||
1745                                                 !enough_space(to_vacpage, tlen))
1746                                         {
1747                                                 for (i = 0; i < num_fraged_pages; i++)
1748                                                 {
1749                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1750                                                                 break;
1751                                                 }
1752
1753                                                 if (i == num_fraged_pages)
1754                                                 {
1755                                                         /* can't move item anywhere */
1756                                                         chain_move_failed = true;
1757                                                         break;          /* out of check-all-items loop */
1758                                                 }
1759                                                 to_item = i;
1760                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1761                                         }
1762                                         to_vacpage->free -= MAXALIGN(tlen);
1763                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1764                                                 to_vacpage->free -= sizeof(ItemIdData);
1765                                         (to_vacpage->offsets_used)++;
1766                                         if (free_vtmove == 0)
1767                                         {
1768                                                 free_vtmove = 1000;
1769                                                 vtmove = (VTupleMove)
1770                                                         repalloc(vtmove,
1771                                                                          (free_vtmove + num_vtmove) *
1772                                                                          sizeof(VTupleMoveData));
1773                                         }
1774                                         vtmove[num_vtmove].tid = tp.t_self;
1775                                         vtmove[num_vtmove].vacpage = to_vacpage;
1776                                         if (to_vacpage->offsets_used == 1)
1777                                                 vtmove[num_vtmove].cleanVpd = true;
1778                                         else
1779                                                 vtmove[num_vtmove].cleanVpd = false;
1780                                         free_vtmove--;
1781                                         num_vtmove++;
1782
1783                                         /* At beginning of chain? */
1784                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1785                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
1786                                                                                           OldestXmin))
1787                                                 break;
1788
1789                                         /* No, move to tuple with prior row version */
1790                                         vtld.new_tid = tp.t_self;
1791                                         vtlp = (VTupleLink)
1792                                                 vac_bsearch((void *) &vtld,
1793                                                                         (void *) (vacrelstats->vtlinks),
1794                                                                         vacrelstats->num_vtlinks,
1795                                                                         sizeof(VTupleLinkData),
1796                                                                         vac_cmp_vtlinks);
1797                                         if (vtlp == NULL)
1798                                         {
1799                                                 /* see discussion above */
1800                                                 elog(DEBUG1, "Parent item in update-chain not found - can't continue repair_frag");
1801                                                 chain_move_failed = true;
1802                                                 break;  /* out of check-all-items loop */
1803                                         }
1804                                         tp.t_self = vtlp->this_tid;
1805                                         Pbuf = ReadBuffer(onerel,
1806                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1807                                         Ppage = BufferGetPage(Pbuf);
1808                                         Pitemid = PageGetItemId(Ppage,
1809                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1810                                         /* this can't happen since we saw tuple earlier: */
1811                                         if (!ItemIdIsUsed(Pitemid))
1812                                                 elog(ERROR, "Parent itemid marked as unused");
1813                                         Ptp.t_datamcxt = NULL;
1814                                         Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1815
1816                                         /* ctid should not have changed since we saved it */
1817                                         Assert(ItemPointerEquals(&(vtld.new_tid),
1818                                                                                          &(Ptp.t_data->t_ctid)));
1819
1820                                         /*
1821                                          * Read above about cases when !ItemIdIsUsed(Citemid)
1822                                          * (child item is removed)... Due to the fact that at
1823                                          * the moment we don't remove unuseful part of
1824                                          * update-chain, it's possible to get too old parent
1825                                          * row here. Like as in the case which caused this
1826                                          * problem, we stop shrinking here. I could try to
1827                                          * find real parent row but want not to do it because
1828                                          * of real solution will be implemented anyway, later,
1829                                          * and we are too close to 6.5 release. - vadim
1830                                          * 06/11/99
1831                                          */
1832                                         if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
1833                                                                          HeapTupleHeaderGetXmin(tp.t_data))))
1834                                         {
1835                                                 ReleaseBuffer(Pbuf);
1836                                                 elog(DEBUG1, "Too old parent tuple found - can't continue repair_frag");
1837                                                 chain_move_failed = true;
1838                                                 break;  /* out of check-all-items loop */
1839                                         }
1840                                         tp.t_datamcxt = Ptp.t_datamcxt;
1841                                         tp.t_data = Ptp.t_data;
1842                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
1843                                         if (freeCbuf)
1844                                                 ReleaseBuffer(Cbuf);
1845                                         Cbuf = Pbuf;
1846                                         freeCbuf = true;
1847                                 }                               /* end of check-all-items loop */
1848
1849                                 if (freeCbuf)
1850                                         ReleaseBuffer(Cbuf);
1851                                 freeCbuf = false;
1852
1853                                 if (chain_move_failed)
1854                                 {
1855                                         /*
1856                                          * Undo changes to offsets_used state.  We don't
1857                                          * bother cleaning up the amount-free state, since
1858                                          * we're not going to do any further tuple motion.
1859                                          */
1860                                         for (i = 0; i < num_vtmove; i++)
1861                                         {
1862                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1863                                                 (vtmove[i].vacpage->offsets_used)--;
1864                                         }
1865                                         pfree(vtmove);
1866                                         break;          /* out of walk-along-page loop */
1867                                 }
1868
1869                                 /*
1870                                  * Okay, move the whle tuple chain
1871                                  */
1872                                 ItemPointerSetInvalid(&Ctid);
1873                                 for (ti = 0; ti < num_vtmove; ti++)
1874                                 {
1875                                         VacPage         destvacpage = vtmove[ti].vacpage;
1876
1877                                         /* Get page to move from */
1878                                         tuple.t_self = vtmove[ti].tid;
1879                                         Cbuf = ReadBuffer(onerel,
1880                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1881
1882                                         /* Get page to move to */
1883                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1884
1885                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1886                                         if (cur_buffer != Cbuf)
1887                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1888
1889                                         ToPage = BufferGetPage(cur_buffer);
1890                                         Cpage = BufferGetPage(Cbuf);
1891
1892                                         Citemid = PageGetItemId(Cpage,
1893                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1894                                         tuple.t_datamcxt = NULL;
1895                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1896                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1897
1898                                         /*
1899                                          * make a copy of the source tuple, and then mark the
1900                                          * source tuple MOVED_OFF.
1901                                          */
1902                                         heap_copytuple_with_tuple(&tuple, &newtup);
1903
1904                                         /*
1905                                          * register invalidation of source tuple in catcaches.
1906                                          */
1907                                         CacheInvalidateHeapTuple(onerel, &tuple);
1908
1909                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1910                                         START_CRIT_SECTION();
1911
1912                                         tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
1913                                                                                                   HEAP_XMIN_INVALID |
1914                                                                                                   HEAP_MOVED_IN);
1915                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1916                                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
1917
1918                                         /*
1919                                          * If this page was not used before - clean it.
1920                                          *
1921                                          * NOTE: a nasty bug used to lurk here.  It is possible
1922                                          * for the source and destination pages to be the same
1923                                          * (since this tuple-chain member can be on a page
1924                                          * lower than the one we're currently processing in
1925                                          * the outer loop).  If that's true, then after
1926                                          * vacuum_page() the source tuple will have been
1927                                          * moved, and tuple.t_data will be pointing at
1928                                          * garbage.  Therefore we must do everything that uses
1929                                          * tuple.t_data BEFORE this step!!
1930                                          *
1931                                          * This path is different from the other callers of
1932                                          * vacuum_page, because we have already incremented
1933                                          * the vacpage's offsets_used field to account for the
1934                                          * tuple(s) we expect to move onto the page. Therefore
1935                                          * vacuum_page's check for offsets_used == 0 is wrong.
1936                                          * But since that's a good debugging check for all
1937                                          * other callers, we work around it here rather than
1938                                          * remove it.
1939                                          */
1940                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1941                                         {
1942                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1943
1944                                                 destvacpage->offsets_used = 0;
1945                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1946                                                 destvacpage->offsets_used = sv_offsets_used;
1947                                         }
1948
1949                                         /*
1950                                          * Update the state of the copied tuple, and store it
1951                                          * on the destination page.
1952                                          */
1953                                         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
1954                                                                                                    HEAP_XMIN_INVALID |
1955                                                                                                    HEAP_MOVED_OFF);
1956                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1957                                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
1958                                         newoff = PageAddItem(ToPage,
1959                                                                                  (Item) newtup.t_data,
1960                                                                                  tuple_len,
1961                                                                                  InvalidOffsetNumber,
1962                                                                                  LP_USED);
1963                                         if (newoff == InvalidOffsetNumber)
1964                                         {
1965                                                 elog(PANIC, "moving chain: failed to add item with len = %lu to page %u",
1966                                                   (unsigned long) tuple_len, destvacpage->blkno);
1967                                         }
1968                                         newitemid = PageGetItemId(ToPage, newoff);
1969                                         pfree(newtup.t_data);
1970                                         newtup.t_datamcxt = NULL;
1971                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1972                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1973
1974                                         /* XLOG stuff */
1975                                         if (!onerel->rd_istemp)
1976                                         {
1977                                                 XLogRecPtr      recptr =
1978                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1979                                                                           cur_buffer, &newtup);
1980
1981                                                 if (Cbuf != cur_buffer)
1982                                                 {
1983                                                         PageSetLSN(Cpage, recptr);
1984                                                         PageSetSUI(Cpage, ThisStartUpID);
1985                                                 }
1986                                                 PageSetLSN(ToPage, recptr);
1987                                                 PageSetSUI(ToPage, ThisStartUpID);
1988                                         }
1989                                         else
1990                                         {
1991                                                 /*
1992                                                  * No XLOG record, but still need to flag that XID
1993                                                  * exists on disk
1994                                                  */
1995                                                 MyXactMadeTempRelUpdate = true;
1996                                         }
1997
1998                                         END_CRIT_SECTION();
1999
2000                                         if (destvacpage->blkno > last_move_dest_block)
2001                                                 last_move_dest_block = destvacpage->blkno;
2002
2003                                         /*
2004                                          * Set new tuple's t_ctid pointing to itself for last
2005                                          * tuple in chain, and to next tuple in chain
2006                                          * otherwise.
2007                                          */
2008                                         if (!ItemPointerIsValid(&Ctid))
2009                                                 newtup.t_data->t_ctid = newtup.t_self;
2010                                         else
2011                                                 newtup.t_data->t_ctid = Ctid;
2012                                         Ctid = newtup.t_self;
2013
2014                                         num_moved++;
2015
2016                                         /*
2017                                          * Remember that we moved tuple from the current page
2018                                          * (corresponding index tuple will be cleaned).
2019                                          */
2020                                         if (Cbuf == buf)
2021                                                 vacpage->offsets[vacpage->offsets_free++] =
2022                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2023                                         else
2024                                                 keep_tuples++;
2025
2026                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2027                                         if (cur_buffer != Cbuf)
2028                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
2029
2030                                         /* Create index entries for the moved tuple */
2031                                         if (resultRelInfo->ri_NumIndices > 0)
2032                                         {
2033                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2034                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
2035                                                                                           estate, true);
2036                                         }
2037
2038                                         WriteBuffer(cur_buffer);
2039                                         WriteBuffer(Cbuf);
2040                                 }                               /* end of move-the-tuple-chain loop */
2041
2042                                 cur_buffer = InvalidBuffer;
2043                                 pfree(vtmove);
2044                                 chain_tuple_moved = true;
2045
2046                                 /* advance to next tuple in walk-along-page loop */
2047                                 continue;
2048                         }                                       /* end of is-tuple-in-chain test */
2049
2050                         /* try to find new page for this tuple */
2051                         if (cur_buffer == InvalidBuffer ||
2052                                 !enough_space(cur_page, tuple_len))
2053                         {
2054                                 if (cur_buffer != InvalidBuffer)
2055                                 {
2056                                         WriteBuffer(cur_buffer);
2057                                         cur_buffer = InvalidBuffer;
2058                                 }
2059                                 for (i = 0; i < num_fraged_pages; i++)
2060                                 {
2061                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2062                                                 break;
2063                                 }
2064                                 if (i == num_fraged_pages)
2065                                         break;          /* can't move item anywhere */
2066                                 cur_item = i;
2067                                 cur_page = fraged_pages->pagedesc[cur_item];
2068                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
2069                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
2070                                 ToPage = BufferGetPage(cur_buffer);
2071                                 /* if this page was not used before - clean it */
2072                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
2073                                         vacuum_page(onerel, cur_buffer, cur_page);
2074                         }
2075                         else
2076                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
2077
2078                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2079
2080                         /* copy tuple */
2081                         heap_copytuple_with_tuple(&tuple, &newtup);
2082
2083                         /*
2084                          * register invalidation of source tuple in catcaches.
2085                          *
2086                          * (Note: we do not need to register the copied tuple, because we
2087                          * are not changing the tuple contents and so there cannot be
2088                          * any need to flush negative catcache entries.)
2089                          */
2090                         CacheInvalidateHeapTuple(onerel, &tuple);
2091
2092                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
2093                         START_CRIT_SECTION();
2094
2095                         /*
2096                          * Mark new tuple as MOVED_IN by me.
2097                          */
2098                         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2099                                                                                    HEAP_XMIN_INVALID |
2100                                                                                    HEAP_MOVED_OFF);
2101                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2102                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2103
2104                         /* add tuple to the page */
2105                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
2106                                                                  InvalidOffsetNumber, LP_USED);
2107                         if (newoff == InvalidOffsetNumber)
2108                         {
2109                                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2110                                          (unsigned long) tuple_len,
2111                                          cur_page->blkno, (unsigned long) cur_page->free,
2112                                          cur_page->offsets_used, cur_page->offsets_free);
2113                         }
2114                         newitemid = PageGetItemId(ToPage, newoff);
2115                         pfree(newtup.t_data);
2116                         newtup.t_datamcxt = NULL;
2117                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
2118                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
2119                         newtup.t_self = newtup.t_data->t_ctid;
2120
2121                         /*
2122                          * Mark old tuple as MOVED_OFF by me.
2123                          */
2124                         tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2125                                                                                   HEAP_XMIN_INVALID |
2126                                                                                   HEAP_MOVED_IN);
2127                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
2128                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
2129
2130                         /* XLOG stuff */
2131                         if (!onerel->rd_istemp)
2132                         {
2133                                 XLogRecPtr      recptr =
2134                                 log_heap_move(onerel, buf, tuple.t_self,
2135                                                           cur_buffer, &newtup);
2136
2137                                 PageSetLSN(page, recptr);
2138                                 PageSetSUI(page, ThisStartUpID);
2139                                 PageSetLSN(ToPage, recptr);
2140                                 PageSetSUI(ToPage, ThisStartUpID);
2141                         }
2142                         else
2143                         {
2144                                 /*
2145                                  * No XLOG record, but still need to flag that XID exists
2146                                  * on disk
2147                                  */
2148                                 MyXactMadeTempRelUpdate = true;
2149                         }
2150
2151                         END_CRIT_SECTION();
2152
2153                         cur_page->offsets_used++;
2154                         num_moved++;
2155                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
2156                         if (cur_page->blkno > last_move_dest_block)
2157                                 last_move_dest_block = cur_page->blkno;
2158
2159                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2160
2161                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2162                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2163
2164                         /* insert index' tuples if needed */
2165                         if (resultRelInfo->ri_NumIndices > 0)
2166                         {
2167                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2168                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
2169                         }
2170                 }                                               /* walk along page */
2171
2172                 /*
2173                  * If we broke out of the walk-along-page loop early (ie, still
2174                  * have offnum <= maxoff), then we failed to move some tuple off
2175                  * this page.  No point in shrinking any more, so clean up and
2176                  * exit the per-page loop.
2177                  */
2178                 if (offnum < maxoff && keep_tuples > 0)
2179                 {
2180                         OffsetNumber off;
2181
2182                         /*
2183                          * Fix vacpage state for any unvisited tuples remaining on
2184                          * page
2185                          */
2186                         for (off = OffsetNumberNext(offnum);
2187                                  off <= maxoff;
2188                                  off = OffsetNumberNext(off))
2189                         {
2190                                 itemid = PageGetItemId(page, off);
2191                                 if (!ItemIdIsUsed(itemid))
2192                                         continue;
2193                                 tuple.t_datamcxt = NULL;
2194                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2195                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2196                                         continue;
2197                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2198                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
2199                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2200                                 {
2201                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2202                                                 elog(ERROR, "Invalid XVAC in tuple header (4)");
2203                                         /* some chains was moved while */
2204                                         if (chain_tuple_moved)
2205                                         {                       /* cleaning this page */
2206                                                 Assert(vacpage->offsets_free > 0);
2207                                                 for (i = 0; i < vacpage->offsets_free; i++)
2208                                                 {
2209                                                         if (vacpage->offsets[i] == off)
2210                                                                 break;
2211                                                 }
2212                                                 if (i >= vacpage->offsets_free) /* not found */
2213                                                 {
2214                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2215                                                         Assert(keep_tuples > 0);
2216                                                         keep_tuples--;
2217                                                 }
2218                                         }
2219                                         else
2220                                         {
2221                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2222                                                 Assert(keep_tuples > 0);
2223                                                 keep_tuples--;
2224                                         }
2225                                 }
2226                                 else
2227                                         elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2228                         }
2229                 }
2230
2231                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2232                 {
2233                         if (chain_tuple_moved)          /* else - they are ordered */
2234                         {
2235                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2236                                           sizeof(OffsetNumber), vac_cmp_offno);
2237                         }
2238                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2239                         WriteBuffer(buf);
2240                 }
2241                 else if (dowrite)
2242                         WriteBuffer(buf);
2243                 else
2244                         ReleaseBuffer(buf);
2245
2246                 if (offnum <= maxoff)
2247                         break;                          /* had to quit early, see above note */
2248
2249         }                                                       /* walk along relation */
2250
2251         blkno++;                                        /* new number of blocks */
2252
2253         if (cur_buffer != InvalidBuffer)
2254         {
2255                 Assert(num_moved > 0);
2256                 WriteBuffer(cur_buffer);
2257         }
2258
2259         if (num_moved > 0)
2260         {
2261                 /*
2262                  * We have to commit our tuple movings before we truncate the
2263                  * relation.  Ideally we should do Commit/StartTransactionCommand
2264                  * here, relying on the session-level table lock to protect our
2265                  * exclusive access to the relation.  However, that would require
2266                  * a lot of extra code to close and re-open the relation, indexes,
2267                  * etc.  For now, a quick hack: record status of current
2268                  * transaction as committed, and continue.
2269                  */
2270                 RecordTransactionCommit();
2271         }
2272
2273         /*
2274          * We are not going to move any more tuples across pages, but we still
2275          * need to apply vacuum_page to compact free space in the remaining
2276          * pages in vacuum_pages list.  Note that some of these pages may also
2277          * be in the fraged_pages list, and may have had tuples moved onto
2278          * them; if so, we already did vacuum_page and needn't do it again.
2279          */
2280         for (i = 0, curpage = vacuum_pages->pagedesc;
2281                  i < vacuumed_pages;
2282                  i++, curpage++)
2283         {
2284                 CHECK_FOR_INTERRUPTS();
2285                 Assert((*curpage)->blkno < blkno);
2286                 if ((*curpage)->offsets_used == 0)
2287                 {
2288                         /* this page was not used as a move target, so must clean it */
2289                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2290                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2291                         page = BufferGetPage(buf);
2292                         if (!PageIsEmpty(page))
2293                                 vacuum_page(onerel, buf, *curpage);
2294                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2295                         WriteBuffer(buf);
2296                 }
2297         }
2298
2299         /*
2300          * Now scan all the pages that we moved tuples onto and update tuple
2301          * status bits.  This is not really necessary, but will save time for
2302          * future transactions examining these tuples.
2303          *
2304          * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
2305          * pages that were move source pages but not move dest pages.  One
2306          * also wonders whether it wouldn't be better to skip this step and
2307          * let the tuple status updates happen someplace that's not holding an
2308          * exclusive lock on the relation.
2309          */
2310         checked_moved = 0;
2311         for (i = 0, curpage = fraged_pages->pagedesc;
2312                  i < num_fraged_pages;
2313                  i++, curpage++)
2314         {
2315                 CHECK_FOR_INTERRUPTS();
2316                 Assert((*curpage)->blkno < blkno);
2317                 if ((*curpage)->blkno > last_move_dest_block)
2318                         break;                          /* no need to scan any further */
2319                 if ((*curpage)->offsets_used == 0)
2320                         continue;                       /* this page was never used as a move dest */
2321                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2322                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2323                 page = BufferGetPage(buf);
2324                 num_tuples = 0;
2325                 max_offset = PageGetMaxOffsetNumber(page);
2326                 for (newoff = FirstOffsetNumber;
2327                          newoff <= max_offset;
2328                          newoff = OffsetNumberNext(newoff))
2329                 {
2330                         itemid = PageGetItemId(page, newoff);
2331                         if (!ItemIdIsUsed(itemid))
2332                                 continue;
2333                         tuple.t_datamcxt = NULL;
2334                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2335                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2336                         {
2337                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED))
2338                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2339                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2340                                         elog(ERROR, "Invalid XVAC in tuple header (2)");
2341                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2342                                 {
2343                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2344                                         tuple.t_data->t_infomask &= ~HEAP_MOVED;
2345                                         num_tuples++;
2346                                 }
2347                                 else
2348                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2349                         }
2350                 }
2351                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2352                 WriteBuffer(buf);
2353                 Assert((*curpage)->offsets_used == num_tuples);
2354                 checked_moved += num_tuples;
2355         }
2356         Assert(num_moved == checked_moved);
2357
2358         elog(elevel, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
2359                  RelationGetRelationName(onerel),
2360                  nblocks, blkno, num_moved,
2361                  vac_show_rusage(&ru0));
2362
2363         /*
2364          * Reflect the motion of system tuples to catalog cache here.
2365          */
2366         CommandCounterIncrement();
2367
2368         if (Nvacpagelist.num_pages > 0)
2369         {
2370                 /* vacuum indexes again if needed */
2371                 if (Irel != (Relation *) NULL)
2372                 {
2373                         VacPage    *vpleft,
2374                                            *vpright,
2375                                                 vpsave;
2376
2377                         /* re-sort Nvacpagelist.pagedesc */
2378                         for (vpleft = Nvacpagelist.pagedesc,
2379                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2380                                  vpleft < vpright; vpleft++, vpright--)
2381                         {
2382                                 vpsave = *vpleft;
2383                                 *vpleft = *vpright;
2384                                 *vpright = vpsave;
2385                         }
2386                         Assert(keep_tuples >= 0);
2387                         for (i = 0; i < nindexes; i++)
2388                                 vacuum_index(&Nvacpagelist, Irel[i],
2389                                                          vacrelstats->rel_tuples, keep_tuples);
2390                 }
2391
2392                 /* clean moved tuples from last page in Nvacpagelist list */
2393                 if (vacpage->blkno == (blkno - 1) &&
2394                         vacpage->offsets_free > 0)
2395                 {
2396                         OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
2397                         int                     uncnt;
2398
2399                         buf = ReadBuffer(onerel, vacpage->blkno);
2400                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2401                         page = BufferGetPage(buf);
2402                         num_tuples = 0;
2403                         maxoff = PageGetMaxOffsetNumber(page);
2404                         for (offnum = FirstOffsetNumber;
2405                                  offnum <= maxoff;
2406                                  offnum = OffsetNumberNext(offnum))
2407                         {
2408                                 itemid = PageGetItemId(page, offnum);
2409                                 if (!ItemIdIsUsed(itemid))
2410                                         continue;
2411                                 tuple.t_datamcxt = NULL;
2412                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2413
2414                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2415                                 {
2416                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2417                                         {
2418                                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2419                                                         elog(ERROR, "Invalid XVAC in tuple header (3)");
2420                                                 itemid->lp_flags &= ~LP_USED;
2421                                                 num_tuples++;
2422                                         }
2423                                         else
2424                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (3)");
2425                                 }
2426
2427                         }
2428                         Assert(vacpage->offsets_free == num_tuples);
2429
2430                         START_CRIT_SECTION();
2431
2432                         uncnt = PageRepairFragmentation(page, unused);
2433
2434                         /* XLOG stuff */
2435                         if (!onerel->rd_istemp)
2436                         {
2437                                 XLogRecPtr      recptr;
2438
2439                                 recptr = log_heap_clean(onerel, buf, unused, uncnt);
2440                                 PageSetLSN(page, recptr);
2441                                 PageSetSUI(page, ThisStartUpID);
2442                         }
2443                         else
2444                         {
2445                                 /*
2446                                  * No XLOG record, but still need to flag that XID exists
2447                                  * on disk
2448                                  */
2449                                 MyXactMadeTempRelUpdate = true;
2450                         }
2451
2452                         END_CRIT_SECTION();
2453
2454                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2455                         WriteBuffer(buf);
2456                 }
2457
2458                 /* now - free new list of reaped pages */
2459                 curpage = Nvacpagelist.pagedesc;
2460                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2461                         pfree(*curpage);
2462                 pfree(Nvacpagelist.pagedesc);
2463         }
2464
2465         /*
2466          * Flush dirty pages out to disk.  We do this unconditionally, even if
2467          * we don't need to truncate, because we want to ensure that all
2468          * tuples have correct on-row commit status on disk (see bufmgr.c's
2469          * comments for FlushRelationBuffers()).
2470          */
2471         i = FlushRelationBuffers(onerel, blkno);
2472         if (i < 0)
2473                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2474                          i);
2475
2476         /* truncate relation, if needed */
2477         if (blkno < nblocks)
2478         {
2479                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2480                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2481                 onerel->rd_targblock = InvalidBlockNumber;
2482                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2483         }
2484
2485         /* clean up */
2486         pfree(vacpage);
2487         if (vacrelstats->vtlinks != NULL)
2488                 pfree(vacrelstats->vtlinks);
2489
2490         ExecDropTupleTable(tupleTable, true);
2491
2492         ExecCloseIndices(resultRelInfo);
2493
2494         FreeExecutorState(estate);
2495 }
2496
2497 /*
2498  *      vacuum_heap() -- free dead tuples
2499  *
2500  *              This routine marks dead tuples as unused and truncates relation
2501  *              if there are "empty" end-blocks.
2502  */
2503 static void
2504 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2505 {
2506         Buffer          buf;
2507         VacPage    *vacpage;
2508         BlockNumber relblocks;
2509         int                     nblocks;
2510         int                     i;
2511
2512         nblocks = vacuum_pages->num_pages;
2513         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2514
2515         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2516         {
2517                 CHECK_FOR_INTERRUPTS();
2518                 if ((*vacpage)->offsets_free > 0)
2519                 {
2520                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2521                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2522                         vacuum_page(onerel, buf, *vacpage);
2523                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2524                         WriteBuffer(buf);
2525                 }
2526         }
2527
2528         /*
2529          * Flush dirty pages out to disk.  We do this unconditionally, even if
2530          * we don't need to truncate, because we want to ensure that all
2531          * tuples have correct on-row commit status on disk (see bufmgr.c's
2532          * comments for FlushRelationBuffers()).
2533          */
2534         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2535         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2536
2537         i = FlushRelationBuffers(onerel, relblocks);
2538         if (i < 0)
2539                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2540                          i);
2541
2542         /* truncate relation if there are some empty end-pages */
2543         if (vacuum_pages->empty_end_pages > 0)
2544         {
2545                 elog(elevel, "Rel %s: Pages: %u --> %u.",
2546                          RelationGetRelationName(onerel),
2547                          vacrelstats->rel_pages, relblocks);
2548                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2549                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2550                 onerel->rd_targblock = InvalidBlockNumber;
2551                 vacrelstats->rel_pages = relblocks;             /* set new number of
2552                                                                                                  * blocks */
2553         }
2554 }
2555
2556 /*
2557  *      vacuum_page() -- free dead tuples on a page
2558  *                                       and repair its fragmentation.
2559  */
2560 static void
2561 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2562 {
2563         OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
2564         int                     uncnt;
2565         Page            page = BufferGetPage(buffer);
2566         ItemId          itemid;
2567         int                     i;
2568
2569         /* There shouldn't be any tuples moved onto the page yet! */
2570         Assert(vacpage->offsets_used == 0);
2571
2572         START_CRIT_SECTION();
2573
2574         for (i = 0; i < vacpage->offsets_free; i++)
2575         {
2576                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2577                 itemid->lp_flags &= ~LP_USED;
2578         }
2579
2580         uncnt = PageRepairFragmentation(page, unused);
2581
2582         /* XLOG stuff */
2583         if (!onerel->rd_istemp)
2584         {
2585                 XLogRecPtr      recptr;
2586
2587                 recptr = log_heap_clean(onerel, buffer, unused, uncnt);
2588                 PageSetLSN(page, recptr);
2589                 PageSetSUI(page, ThisStartUpID);
2590         }
2591         else
2592         {
2593                 /* No XLOG record, but still need to flag that XID exists on disk */
2594                 MyXactMadeTempRelUpdate = true;
2595         }
2596
2597         END_CRIT_SECTION();
2598 }
2599
2600 /*
2601  *      scan_index() -- scan one index relation to update statistic.
2602  *
2603  * We use this when we have no deletions to do.
2604  */
2605 static void
2606 scan_index(Relation indrel, double num_tuples)
2607 {
2608         IndexBulkDeleteResult *stats;
2609         IndexVacuumCleanupInfo vcinfo;
2610         VacRUsage       ru0;
2611
2612         vac_init_rusage(&ru0);
2613
2614         /*
2615          * Even though we're not planning to delete anything, we use the
2616          * ambulkdelete call, because (a) the scan happens within the index AM
2617          * for more speed, and (b) it may want to pass private statistics to
2618          * the amvacuumcleanup call.
2619          */
2620         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2621
2622         /* Do post-VACUUM cleanup, even though we deleted nothing */
2623         vcinfo.vacuum_full = true;
2624         vcinfo.message_level = elevel;
2625
2626         stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
2627
2628         if (!stats)
2629                 return;
2630
2631         /* now update statistics in pg_class */
2632         vac_update_relstats(RelationGetRelid(indrel),
2633                                                 stats->num_pages, stats->num_index_tuples,
2634                                                 false);
2635
2636         elog(elevel, "Index %s: Pages %u, %u deleted, %u free; Tuples %.0f.\n\t%s",
2637                  RelationGetRelationName(indrel),
2638                  stats->num_pages, stats->pages_deleted, stats->pages_free,
2639                  stats->num_index_tuples,
2640                  vac_show_rusage(&ru0));
2641
2642         /*
2643          * Check for tuple count mismatch.      If the index is partial, then it's
2644          * OK for it to have fewer tuples than the heap; else we got trouble.
2645          */
2646         if (stats->num_index_tuples != num_tuples)
2647         {
2648                 if (stats->num_index_tuples > num_tuples ||
2649                         !vac_is_partial_index(indrel))
2650                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f)."
2651                                  "\n\tRecreate the index.",
2652                                  RelationGetRelationName(indrel),
2653                                  stats->num_index_tuples, num_tuples);
2654         }
2655
2656         pfree(stats);
2657 }
2658
2659 /*
2660  *      vacuum_index() -- vacuum one index relation.
2661  *
2662  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2663  *              It's locked. Indrel is an index relation on the vacuumed heap.
2664  *
2665  *              We don't bother to set locks on the index relation here, since
2666  *              the parent table is exclusive-locked already.
2667  *
2668  *              Finally, we arrange to update the index relation's statistics in
2669  *              pg_class.
2670  */
2671 static void
2672 vacuum_index(VacPageList vacpagelist, Relation indrel,
2673                          double num_tuples, int keep_tuples)
2674 {
2675         IndexBulkDeleteResult *stats;
2676         IndexVacuumCleanupInfo vcinfo;
2677         VacRUsage       ru0;
2678
2679         vac_init_rusage(&ru0);
2680
2681         /* Do bulk deletion */
2682         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2683
2684         /* Do post-VACUUM cleanup */
2685         vcinfo.vacuum_full = true;
2686         vcinfo.message_level = elevel;
2687
2688         stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
2689
2690         if (!stats)
2691                 return;
2692
2693         /* now update statistics in pg_class */
2694         vac_update_relstats(RelationGetRelid(indrel),
2695                                                 stats->num_pages, stats->num_index_tuples,
2696                                                 false);
2697
2698         elog(elevel, "Index %s: Pages %u, %u deleted, %u free; Tuples %.0f: Deleted %.0f.\n\t%s",
2699                  RelationGetRelationName(indrel),
2700                  stats->num_pages, stats->pages_deleted, stats->pages_free,
2701                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2702                  vac_show_rusage(&ru0));
2703
2704         /*
2705          * Check for tuple count mismatch.      If the index is partial, then it's
2706          * OK for it to have fewer tuples than the heap; else we got trouble.
2707          */
2708         if (stats->num_index_tuples != num_tuples + keep_tuples)
2709         {
2710                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2711                         !vac_is_partial_index(indrel))
2712                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f)."
2713                                  "\n\tRecreate the index.",
2714                                  RelationGetRelationName(indrel),
2715                                  stats->num_index_tuples, num_tuples);
2716         }
2717
2718         pfree(stats);
2719 }
2720
2721 /*
2722  *      tid_reaped() -- is a particular tid reaped?
2723  *
2724  *              This has the right signature to be an IndexBulkDeleteCallback.
2725  *
2726  *              vacpagelist->VacPage_array is sorted in right order.
2727  */
2728 static bool
2729 tid_reaped(ItemPointer itemptr, void *state)
2730 {
2731         VacPageList vacpagelist = (VacPageList) state;
2732         OffsetNumber ioffno;
2733         OffsetNumber *voff;
2734         VacPage         vp,
2735                            *vpp;
2736         VacPageData vacpage;
2737
2738         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2739         ioffno = ItemPointerGetOffsetNumber(itemptr);
2740
2741         vp = &vacpage;
2742         vpp = (VacPage *) vac_bsearch((void *) &vp,
2743                                                                   (void *) (vacpagelist->pagedesc),
2744                                                                   vacpagelist->num_pages,
2745                                                                   sizeof(VacPage),
2746                                                                   vac_cmp_blk);
2747
2748         if (vpp == NULL)
2749                 return false;
2750
2751         /* ok - we are on a partially or fully reaped page */
2752         vp = *vpp;
2753
2754         if (vp->offsets_free == 0)
2755         {
2756                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2757                 return true;
2758         }
2759
2760         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2761                                                                                 (void *) (vp->offsets),
2762                                                                                 vp->offsets_free,
2763                                                                                 sizeof(OffsetNumber),
2764                                                                                 vac_cmp_offno);
2765
2766         if (voff == NULL)
2767                 return false;
2768
2769         /* tid is reaped */
2770         return true;
2771 }
2772
2773 /*
2774  * Dummy version for scan_index.
2775  */
2776 static bool
2777 dummy_tid_reaped(ItemPointer itemptr, void *state)
2778 {
2779         return false;
2780 }
2781
2782 /*
2783  * Update the shared Free Space Map with the info we now have about
2784  * free space in the relation, discarding any old info the map may have.
2785  */
2786 static void
2787 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2788                            BlockNumber rel_pages)
2789 {
2790         int                     nPages = fraged_pages->num_pages;
2791         VacPage    *pagedesc = fraged_pages->pagedesc;
2792         Size            threshold;
2793         PageFreeSpaceInfo *pageSpaces;
2794         int                     outPages;
2795         int                     i;
2796
2797         /*
2798          * We only report pages with free space at least equal to the average
2799          * request size --- this avoids cluttering FSM with uselessly-small bits
2800          * of space.  Although FSM would discard pages with little free space
2801          * anyway, it's important to do this prefiltering because (a) it reduces
2802          * the time spent holding the FSM lock in RecordRelationFreeSpace, and
2803          * (b) FSM uses the number of pages reported as a statistic for guiding
2804          * space management.  If we didn't threshold our reports the same way
2805          * vacuumlazy.c does, we'd be skewing that statistic.
2806          */
2807         threshold = GetAvgFSMRequestSize(&onerel->rd_node);
2808
2809         /* +1 to avoid palloc(0) */
2810         pageSpaces = (PageFreeSpaceInfo *)
2811                 palloc((nPages + 1) * sizeof(PageFreeSpaceInfo));
2812         outPages = 0;
2813
2814         for (i = 0; i < nPages; i++)
2815         {
2816                 /*
2817                  * fraged_pages may contain entries for pages that we later
2818                  * decided to truncate from the relation; don't enter them into
2819                  * the free space map!
2820                  */
2821                 if (pagedesc[i]->blkno >= rel_pages)
2822                         break;
2823
2824                 if (pagedesc[i]->free >= threshold)
2825                 {
2826                         pageSpaces[outPages].blkno = pagedesc[i]->blkno;
2827                         pageSpaces[outPages].avail = pagedesc[i]->free;
2828                         outPages++;
2829                 }
2830         }
2831
2832         RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
2833
2834         pfree(pageSpaces);
2835 }
2836
2837 /* Copy a VacPage structure */
2838 static VacPage
2839 copy_vac_page(VacPage vacpage)
2840 {
2841         VacPage         newvacpage;
2842
2843         /* allocate a VacPageData entry */
2844         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2845                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2846
2847         /* fill it in */
2848         if (vacpage->offsets_free > 0)
2849                 memcpy(newvacpage->offsets, vacpage->offsets,
2850                            vacpage->offsets_free * sizeof(OffsetNumber));
2851         newvacpage->blkno = vacpage->blkno;
2852         newvacpage->free = vacpage->free;
2853         newvacpage->offsets_used = vacpage->offsets_used;
2854         newvacpage->offsets_free = vacpage->offsets_free;
2855
2856         return newvacpage;
2857 }
2858
2859 /*
2860  * Add a VacPage pointer to a VacPageList.
2861  *
2862  *              As a side effect of the way that scan_heap works,
2863  *              higher pages come after lower pages in the array
2864  *              (and highest tid on a page is last).
2865  */
2866 static void
2867 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2868 {
2869 #define PG_NPAGEDESC 1024
2870
2871         /* allocate a VacPage entry if needed */
2872         if (vacpagelist->num_pages == 0)
2873         {
2874                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2875                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2876         }
2877         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2878         {
2879                 vacpagelist->num_allocated_pages *= 2;
2880                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2881         }
2882         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2883         (vacpagelist->num_pages)++;
2884 }
2885
2886 /*
2887  * vac_bsearch: just like standard C library routine bsearch(),
2888  * except that we first test to see whether the target key is outside
2889  * the range of the table entries.      This case is handled relatively slowly
2890  * by the normal binary search algorithm (ie, no faster than any other key)
2891  * but it occurs often enough in VACUUM to be worth optimizing.
2892  */
2893 static void *
2894 vac_bsearch(const void *key, const void *base,
2895                         size_t nelem, size_t size,
2896                         int (*compar) (const void *, const void *))
2897 {
2898         int                     res;
2899         const void *last;
2900
2901         if (nelem == 0)
2902                 return NULL;
2903         res = compar(key, base);
2904         if (res < 0)
2905                 return NULL;
2906         if (res == 0)
2907                 return (void *) base;
2908         if (nelem > 1)
2909         {
2910                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2911                 res = compar(key, last);
2912                 if (res > 0)
2913                         return NULL;
2914                 if (res == 0)
2915                         return (void *) last;
2916         }
2917         if (nelem <= 2)
2918                 return NULL;                    /* already checked 'em all */
2919         return bsearch(key, base, nelem, size, compar);
2920 }
2921
2922 /*
2923  * Comparator routines for use with qsort() and bsearch().
2924  */
2925 static int
2926 vac_cmp_blk(const void *left, const void *right)
2927 {
2928         BlockNumber lblk,
2929                                 rblk;
2930
2931         lblk = (*((VacPage *) left))->blkno;
2932         rblk = (*((VacPage *) right))->blkno;
2933
2934         if (lblk < rblk)
2935                 return -1;
2936         if (lblk == rblk)
2937                 return 0;
2938         return 1;
2939 }
2940
2941 static int
2942 vac_cmp_offno(const void *left, const void *right)
2943 {
2944         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2945                 return -1;
2946         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2947                 return 0;
2948         return 1;
2949 }
2950
2951 static int
2952 vac_cmp_vtlinks(const void *left, const void *right)
2953 {
2954         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2955                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2956                 return -1;
2957         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2958                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2959                 return 1;
2960         /* bi_hi-es are equal */
2961         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2962                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2963                 return -1;
2964         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2965                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2966                 return 1;
2967         /* bi_lo-es are equal */
2968         if (((VTupleLink) left)->new_tid.ip_posid <
2969                 ((VTupleLink) right)->new_tid.ip_posid)
2970                 return -1;
2971         if (((VTupleLink) left)->new_tid.ip_posid >
2972                 ((VTupleLink) right)->new_tid.ip_posid)
2973                 return 1;
2974         return 0;
2975 }
2976
2977
2978 void
2979 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2980 {
2981         List       *indexoidlist,
2982                            *indexoidscan;
2983         int                     i;
2984
2985         indexoidlist = RelationGetIndexList(relation);
2986
2987         *nindexes = length(indexoidlist);
2988
2989         if (*nindexes > 0)
2990                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2991         else
2992                 *Irel = NULL;
2993
2994         i = 0;
2995         foreach(indexoidscan, indexoidlist)
2996         {
2997                 Oid                     indexoid = lfirsto(indexoidscan);
2998
2999                 (*Irel)[i] = index_open(indexoid);
3000                 i++;
3001         }
3002
3003         freeList(indexoidlist);
3004 }
3005
3006
3007 void
3008 vac_close_indexes(int nindexes, Relation *Irel)
3009 {
3010         if (Irel == (Relation *) NULL)
3011                 return;
3012
3013         while (nindexes--)
3014                 index_close(Irel[nindexes]);
3015         pfree(Irel);
3016 }
3017
3018
3019 /*
3020  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3021  */
3022 bool
3023 vac_is_partial_index(Relation indrel)
3024 {
3025         /*
3026          * If the index's AM doesn't support nulls, it's partial for our
3027          * purposes
3028          */
3029         if (!indrel->rd_am->amindexnulls)
3030                 return true;
3031
3032         /* Otherwise, look to see if there's a partial-index predicate */
3033         return (VARSIZE(&indrel->rd_index->indpred) > VARHDRSZ);
3034 }
3035
3036
3037 static bool
3038 enough_space(VacPage vacpage, Size len)
3039 {
3040         len = MAXALIGN(len);
3041
3042         if (len > vacpage->free)
3043                 return false;
3044
3045         /* if there are free itemid(s) and len <= free_space... */
3046         if (vacpage->offsets_used < vacpage->offsets_free)
3047                 return true;
3048
3049         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3050         if (len + sizeof(ItemIdData) <= vacpage->free)
3051                 return true;
3052
3053         return false;
3054 }
3055
3056
3057 /*
3058  * Initialize usage snapshot.
3059  */
3060 void
3061 vac_init_rusage(VacRUsage *ru0)
3062 {
3063         struct timezone tz;
3064
3065         getrusage(RUSAGE_SELF, &ru0->ru);
3066         gettimeofday(&ru0->tv, &tz);
3067 }
3068
3069 /*
3070  * Compute elapsed time since ru0 usage snapshot, and format into
3071  * a displayable string.  Result is in a static string, which is
3072  * tacky, but no one ever claimed that the Postgres backend is
3073  * threadable...
3074  */
3075 const char *
3076 vac_show_rusage(VacRUsage *ru0)
3077 {
3078         static char result[100];
3079         VacRUsage       ru1;
3080
3081         vac_init_rusage(&ru1);
3082
3083         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
3084         {
3085                 ru1.tv.tv_sec--;
3086                 ru1.tv.tv_usec += 1000000;
3087         }
3088         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
3089         {
3090                 ru1.ru.ru_stime.tv_sec--;
3091                 ru1.ru.ru_stime.tv_usec += 1000000;
3092         }
3093         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
3094         {
3095                 ru1.ru.ru_utime.tv_sec--;
3096                 ru1.ru.ru_utime.tv_usec += 1000000;
3097         }
3098
3099         snprintf(result, sizeof(result),
3100                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
3101                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
3102           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
3103                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
3104           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
3105                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
3106                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
3107
3108         return result;
3109 }