]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
pgindent run on all C files. Java run to follow. initdb/regression
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.210 2001/10/25 05:49:26 momjian Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/pg_database.h"
31 #include "catalog/pg_index.h"
32 #include "commands/vacuum.h"
33 #include "executor/executor.h"
34 #include "miscadmin.h"
35 #include "storage/freespace.h"
36 #include "storage/sinval.h"
37 #include "storage/smgr.h"
38 #include "tcop/pquery.h"
39 #include "utils/acl.h"
40 #include "utils/builtins.h"
41 #include "utils/fmgroids.h"
42 #include "utils/inval.h"
43 #include "utils/relcache.h"
44 #include "utils/syscache.h"
45 #include "utils/temprel.h"
46
47 #include "pgstat.h"
48
49
50 typedef struct VRelListData
51 {
52         Oid                     vrl_relid;
53         struct VRelListData *vrl_next;
54 } VRelListData;
55
56 typedef VRelListData *VRelList;
57
58 typedef struct VacPageData
59 {
60         BlockNumber blkno;                      /* BlockNumber of this Page */
61         Size            free;                   /* FreeSpace on this Page */
62         uint16          offsets_used;   /* Number of OffNums used by vacuum */
63         uint16          offsets_free;   /* Number of OffNums free or to be free */
64         OffsetNumber offsets[1];        /* Array of free OffNums */
65 } VacPageData;
66
67 typedef VacPageData *VacPage;
68
69 typedef struct VacPageListData
70 {
71         BlockNumber empty_end_pages;            /* Number of "empty" end-pages */
72         int                     num_pages;              /* Number of pages in pagedesc */
73         int                     num_allocated_pages;    /* Number of allocated pages in
74                                                                                  * pagedesc */
75         VacPage    *pagedesc;           /* Descriptions of pages */
76 } VacPageListData;
77
78 typedef VacPageListData *VacPageList;
79
80 typedef struct VTupleLinkData
81 {
82         ItemPointerData new_tid;
83         ItemPointerData this_tid;
84 } VTupleLinkData;
85
86 typedef VTupleLinkData *VTupleLink;
87
88 typedef struct VTupleMoveData
89 {
90         ItemPointerData tid;            /* tuple ID */
91         VacPage         vacpage;                /* where to move */
92         bool            cleanVpd;               /* clean vacpage before using */
93 } VTupleMoveData;
94
95 typedef VTupleMoveData *VTupleMove;
96
97 typedef struct VRelStats
98 {
99         BlockNumber rel_pages;
100         double          rel_tuples;
101         Size            min_tlen;
102         Size            max_tlen;
103         bool            hasindex;
104         int                     num_vtlinks;
105         VTupleLink      vtlinks;
106 } VRelStats;
107
108
109 static MemoryContext vac_context = NULL;
110
111 static int      MESSAGE_LEVEL;          /* message level */
112
113 static TransactionId OldestXmin;
114 static TransactionId FreezeLimit;
115
116 static TransactionId initialOldestXmin;
117 static TransactionId initialFreezeLimit;
118
119
120 /* non-export function prototypes */
121 static void vacuum_init(VacuumStmt *vacstmt);
122 static void vacuum_shutdown(VacuumStmt *vacstmt);
123 static VRelList getrels(Name VacRelP, const char *stmttype);
124 static void vac_update_dbstats(Oid dbid,
125                                    TransactionId vacuumXID,
126                                    TransactionId frozenXID);
127 static void vac_truncate_clog(TransactionId vacuumXID,
128                                   TransactionId frozenXID);
129 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt);
130 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
131 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
132                   VacPageList vacuum_pages, VacPageList fraged_pages);
133 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
134                         VacPageList vacuum_pages, VacPageList fraged_pages,
135                         int nindexes, Relation *Irel);
136 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
137                         VacPageList vacpagelist);
138 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
139 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
140                          double num_tuples, int keep_tuples);
141 static void scan_index(Relation indrel, double num_tuples);
142 static bool tid_reaped(ItemPointer itemptr, void *state);
143 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
144 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
145                            BlockNumber rel_pages);
146 static VacPage copy_vac_page(VacPage vacpage);
147 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
148 static void *vac_bsearch(const void *key, const void *base,
149                         size_t nelem, size_t size,
150                         int (*compar) (const void *, const void *));
151 static int      vac_cmp_blk(const void *left, const void *right);
152 static int      vac_cmp_offno(const void *left, const void *right);
153 static int      vac_cmp_vtlinks(const void *left, const void *right);
154 static bool enough_space(VacPage vacpage, Size len);
155
156
157 /****************************************************************************
158  *                                                                                                                                                      *
159  *                      Code common to all flavors of VACUUM and ANALYZE                                *
160  *                                                                                                                                                      *
161  ****************************************************************************
162  */
163
164
165 /*
166  * Primary entry point for VACUUM and ANALYZE commands.
167  */
168 void
169 vacuum(VacuumStmt *vacstmt)
170 {
171         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
172         NameData        VacRel;
173         Name            VacRelName;
174         VRelList        vrl,
175                                 cur;
176
177         /*
178          * We cannot run VACUUM inside a user transaction block; if we were
179          * inside a transaction, then our commit- and
180          * start-transaction-command calls would not have the intended effect!
181          * Furthermore, the forced commit that occurs before truncating the
182          * relation's file would have the effect of committing the rest of the
183          * user's transaction too, which would certainly not be the desired
184          * behavior.
185          */
186         if (IsTransactionBlock())
187                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
188
189         /*
190          * Send info about dead objects to the statistics collector
191          */
192         pgstat_vacuum_tabstat();
193
194         if (vacstmt->verbose)
195                 MESSAGE_LEVEL = NOTICE;
196         else
197                 MESSAGE_LEVEL = DEBUG;
198
199         /*
200          * Create special memory context for cross-transaction storage.
201          *
202          * Since it is a child of QueryContext, it will go away eventually even
203          * if we suffer an error; there's no need for special abort cleanup
204          * logic.
205          */
206         vac_context = AllocSetContextCreate(QueryContext,
207                                                                                 "Vacuum",
208                                                                                 ALLOCSET_DEFAULT_MINSIZE,
209                                                                                 ALLOCSET_DEFAULT_INITSIZE,
210                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
211
212         /* Convert vacrel, which is just a string, to a Name */
213         if (vacstmt->vacrel)
214         {
215                 namestrcpy(&VacRel, vacstmt->vacrel);
216                 VacRelName = &VacRel;
217         }
218         else
219                 VacRelName = NULL;
220
221         /* Build list of relations to process (note this lives in vac_context) */
222         vrl = getrels(VacRelName, stmttype);
223
224         /*
225          * Start up the vacuum cleaner.
226          */
227         vacuum_init(vacstmt);
228
229         /*
230          * Process each selected relation.      We are careful to process each
231          * relation in a separate transaction in order to avoid holding too
232          * many locks at one time.      Also, if we are doing VACUUM ANALYZE, the
233          * ANALYZE part runs as a separate transaction from the VACUUM to
234          * further reduce locking.
235          */
236         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
237         {
238                 if (vacstmt->vacuum)
239                         vacuum_rel(cur->vrl_relid, vacstmt);
240                 if (vacstmt->analyze)
241                         analyze_rel(cur->vrl_relid, vacstmt);
242         }
243
244         /* clean up */
245         vacuum_shutdown(vacstmt);
246 }
247
248 /*
249  *      vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
250  *
251  *              Formerly, there was code here to prevent more than one VACUUM from
252  *              executing concurrently in the same database.  However, there's no
253  *              good reason to prevent that, and manually removing lockfiles after
254  *              a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
255  *              and just rely on the locks we grab on each target table
256  *              to ensure that there aren't two VACUUMs running on the same table
257  *              at the same time.
258  *
259  *              The strangeness with committing and starting transactions in the
260  *              init and shutdown routines is due to the fact that the vacuum cleaner
261  *              is invoked via an SQL command, and so is already executing inside
262  *              a transaction.  We need to leave ourselves in a predictable state
263  *              on entry and exit to the vacuum cleaner.  We commit the transaction
264  *              started in PostgresMain() inside vacuum_init(), and start one in
265  *              vacuum_shutdown() to match the commit waiting for us back in
266  *              PostgresMain().
267  */
268 static void
269 vacuum_init(VacuumStmt *vacstmt)
270 {
271         if (vacstmt->vacuum && vacstmt->vacrel == NULL)
272         {
273                 /*
274                  * Compute the initially applicable OldestXmin and FreezeLimit
275                  * XIDs, so that we can record these values at the end of the
276                  * VACUUM. Note that individual tables may well be processed with
277                  * newer values, but we can guarantee that no (non-shared)
278                  * relations are processed with older ones.
279                  *
280                  * It is okay to record non-shared values in pg_database, even though
281                  * we may vacuum shared relations with older cutoffs, because only
282                  * the minimum of the values present in pg_database matters.  We
283                  * can be sure that shared relations have at some time been
284                  * vacuumed with cutoffs no worse than the global minimum; for, if
285                  * there is a backend in some other DB with xmin = OLDXMIN that's
286                  * determining the cutoff with which we vacuum shared relations,
287                  * it is not possible for that database to have a cutoff newer
288                  * than OLDXMIN recorded in pg_database.
289                  */
290                 vacuum_set_xid_limits(vacstmt, false,
291                                                           &initialOldestXmin, &initialFreezeLimit);
292         }
293
294         /* matches the StartTransaction in PostgresMain() */
295         CommitTransactionCommand();
296 }
297
298 static void
299 vacuum_shutdown(VacuumStmt *vacstmt)
300 {
301         /* on entry, we are not in a transaction */
302
303         /* matches the CommitTransaction in PostgresMain() */
304         StartTransactionCommand();
305
306         /*
307          * If we did a database-wide VACUUM, update the database's pg_database
308          * row with info about the transaction IDs used, and try to truncate
309          * pg_clog.
310          */
311         if (vacstmt->vacuum && vacstmt->vacrel == NULL)
312         {
313                 vac_update_dbstats(MyDatabaseId,
314                                                    initialOldestXmin, initialFreezeLimit);
315                 vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
316         }
317
318         /*
319          * If we did a complete vacuum or analyze, then flush the init file
320          * that relcache.c uses to save startup time. The next backend startup
321          * will rebuild the init file with up-to-date information from
322          * pg_class. This lets the optimizer see the stats that we've
323          * collected for certain critical system indexes.  See relcache.c for
324          * more details.
325          *
326          * Ignore any failure to unlink the file, since it might not be there if
327          * no backend has been started since the last vacuum.
328          */
329         if (vacstmt->vacrel == NULL)
330                 unlink(RELCACHE_INIT_FILENAME);
331
332         /*
333          * Clean up working storage --- note we must do this after
334          * StartTransactionCommand, else we might be trying to delete the
335          * active context!
336          */
337         MemoryContextDelete(vac_context);
338         vac_context = NULL;
339 }
340
341 /*
342  * Build a list of VRelListData nodes for each relation to be processed
343  *
344  * The list is built in vac_context so that it will survive across our
345  * per-relation transactions.
346  */
347 static VRelList
348 getrels(Name VacRelP, const char *stmttype)
349 {
350         Relation        rel;
351         TupleDesc       tupdesc;
352         HeapScanDesc scan;
353         HeapTuple       tuple;
354         VRelList        vrl,
355                                 cur;
356         Datum           d;
357         char       *rname;
358         char            rkind;
359         bool            n;
360         ScanKeyData key;
361
362         if (VacRelP)
363         {
364                 /*
365                  * we could use the cache here, but it is clearer to use scankeys
366                  * for both vacuum cases, bjm 2000/01/19
367                  */
368                 char       *nontemp_relname;
369
370                 /* We must re-map temp table names bjm 2000-04-06 */
371                 nontemp_relname = get_temp_rel_by_username(NameStr(*VacRelP));
372                 if (nontemp_relname == NULL)
373                         nontemp_relname = NameStr(*VacRelP);
374
375                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
376                                                            F_NAMEEQ,
377                                                            PointerGetDatum(nontemp_relname));
378         }
379         else
380         {
381                 /* find all plain relations listed in pg_class */
382                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
383                                                            F_CHAREQ, CharGetDatum(RELKIND_RELATION));
384         }
385
386         vrl = cur = (VRelList) NULL;
387
388         rel = heap_openr(RelationRelationName, AccessShareLock);
389         tupdesc = RelationGetDescr(rel);
390
391         scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
392
393         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
394         {
395                 d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
396                 rname = (char *) DatumGetName(d);
397
398                 d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
399                 rkind = DatumGetChar(d);
400
401                 if (rkind != RELKIND_RELATION)
402                 {
403                         elog(NOTICE, "%s: can not process indexes, views or special system tables",
404                                  stmttype);
405                         continue;
406                 }
407
408                 /* Make a relation list entry for this guy */
409                 if (vrl == (VRelList) NULL)
410                         vrl = cur = (VRelList)
411                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
412                 else
413                 {
414                         cur->vrl_next = (VRelList)
415                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
416                         cur = cur->vrl_next;
417                 }
418
419                 cur->vrl_relid = tuple->t_data->t_oid;
420                 cur->vrl_next = (VRelList) NULL;
421         }
422
423         heap_endscan(scan);
424         heap_close(rel, AccessShareLock);
425
426         if (vrl == NULL)
427                 elog(NOTICE, "%s: table not found", stmttype);
428
429         return vrl;
430 }
431
432 /*
433  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
434  */
435 void
436 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
437                                           TransactionId *oldestXmin,
438                                           TransactionId *freezeLimit)
439 {
440         TransactionId limit;
441
442         *oldestXmin = GetOldestXmin(sharedRel);
443
444         Assert(TransactionIdIsNormal(*oldestXmin));
445
446         if (vacstmt->freeze)
447         {
448                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
449                 limit = *oldestXmin;
450         }
451         else
452         {
453                 /*
454                  * Normal case: freeze cutoff is well in the past, to wit, about
455                  * halfway to the wrap horizon
456                  */
457                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
458         }
459
460         /*
461          * Be careful not to generate a "permanent" XID
462          */
463         if (!TransactionIdIsNormal(limit))
464                 limit = FirstNormalTransactionId;
465
466         /*
467          * Ensure sane relationship of limits
468          */
469         if (TransactionIdFollows(limit, *oldestXmin))
470         {
471                 elog(NOTICE, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
472                 limit = *oldestXmin;
473         }
474
475         *freezeLimit = limit;
476 }
477
478
479 /*
480  *      vac_update_relstats() -- update statistics for one relation
481  *
482  *              Update the whole-relation statistics that are kept in its pg_class
483  *              row.  There are additional stats that will be updated if we are
484  *              doing ANALYZE, but we always update these stats.  This routine works
485  *              for both index and heap relation entries in pg_class.
486  *
487  *              We violate no-overwrite semantics here by storing new values for the
488  *              statistics columns directly into the pg_class tuple that's already on
489  *              the page.  The reason for this is that if we updated these tuples in
490  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
491  *              by the time we got done with a vacuum cycle, most of the tuples in
492  *              pg_class would've been obsoleted.  Of course, this only works for
493  *              fixed-size never-null columns, but these are.
494  *
495  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
496  *              ANALYZE.
497  */
498 void
499 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
500                                         bool hasindex)
501 {
502         Relation        rd;
503         HeapTupleData rtup;
504         HeapTuple       ctup;
505         Form_pg_class pgcform;
506         Buffer          buffer;
507
508         /*
509          * update number of tuples and number of pages in pg_class
510          */
511         rd = heap_openr(RelationRelationName, RowExclusiveLock);
512
513         ctup = SearchSysCache(RELOID,
514                                                   ObjectIdGetDatum(relid),
515                                                   0, 0, 0);
516         if (!HeapTupleIsValid(ctup))
517                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
518                          relid);
519
520         /* get the buffer cache tuple */
521         rtup.t_self = ctup->t_self;
522         ReleaseSysCache(ctup);
523         heap_fetch(rd, SnapshotNow, &rtup, &buffer, NULL);
524
525         /* overwrite the existing statistics in the tuple */
526         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
527         pgcform->relpages = (int32) num_pages;
528         pgcform->reltuples = num_tuples;
529         pgcform->relhasindex = hasindex;
530
531         /*
532          * If we have discovered that there are no indexes, then there's no
533          * primary key either.  This could be done more thoroughly...
534          */
535         if (!hasindex)
536                 pgcform->relhaspkey = false;
537
538         /* invalidate the tuple in the cache and write the buffer */
539         RelationInvalidateHeapTuple(rd, &rtup);
540         WriteBuffer(buffer);
541
542         heap_close(rd, RowExclusiveLock);
543 }
544
545
546 /*
547  *      vac_update_dbstats() -- update statistics for one database
548  *
549  *              Update the whole-database statistics that are kept in its pg_database
550  *              row.
551  *
552  *              We violate no-overwrite semantics here by storing new values for the
553  *              statistics columns directly into the tuple that's already on the page.
554  *              As with vac_update_relstats, this avoids leaving dead tuples behind
555  *              after a VACUUM; which is good since GetRawDatabaseInfo
556  *              can get confused by finding dead tuples in pg_database.
557  *
558  *              This routine is shared by full and lazy VACUUM.  Note that it is only
559  *              applied after a database-wide VACUUM operation.
560  */
561 static void
562 vac_update_dbstats(Oid dbid,
563                                    TransactionId vacuumXID,
564                                    TransactionId frozenXID)
565 {
566         Relation        relation;
567         ScanKeyData entry[1];
568         HeapScanDesc scan;
569         HeapTuple       tuple;
570         Form_pg_database dbform;
571
572         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
573
574         /* Must use a heap scan, since there's no syscache for pg_database */
575         ScanKeyEntryInitialize(&entry[0], 0x0,
576                                                    ObjectIdAttributeNumber, F_OIDEQ,
577                                                    ObjectIdGetDatum(dbid));
578
579         scan = heap_beginscan(relation, 0, SnapshotNow, 1, entry);
580
581         tuple = heap_getnext(scan, 0);
582
583         if (!HeapTupleIsValid(tuple))
584                 elog(ERROR, "database %u does not exist", dbid);
585
586         dbform = (Form_pg_database) GETSTRUCT(tuple);
587
588         /* overwrite the existing statistics in the tuple */
589         dbform->datvacuumxid = vacuumXID;
590         dbform->datfrozenxid = frozenXID;
591
592         /* invalidate the tuple in the cache and write the buffer */
593         RelationInvalidateHeapTuple(relation, tuple);
594         WriteNoReleaseBuffer(scan->rs_cbuf);
595
596         heap_endscan(scan);
597
598         heap_close(relation, RowExclusiveLock);
599 }
600
601
602 /*
603  *      vac_truncate_clog() -- attempt to truncate the commit log
604  *
605  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
606  *              and use it to truncate the transaction commit log (pg_clog).
607  *              Also generate a warning if the system-wide oldest datfrozenxid
608  *              seems to be in danger of wrapping around.
609  *
610  *              The passed XIDs are simply the ones I just wrote into my pg_database
611  *              entry.  They're used to initialize the "min" calculations.
612  *
613  *              This routine is shared by full and lazy VACUUM.  Note that it is only
614  *              applied after a database-wide VACUUM operation.
615  */
616 static void
617 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
618 {
619         Relation        relation;
620         HeapScanDesc scan;
621         HeapTuple       tuple;
622         int32           age;
623
624         relation = heap_openr(DatabaseRelationName, AccessShareLock);
625
626         scan = heap_beginscan(relation, 0, SnapshotNow, 0, NULL);
627
628         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
629         {
630                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
631
632                 /* Ignore non-connectable databases (eg, template0) */
633                 /* It's assumed that these have been frozen correctly */
634                 if (!dbform->datallowconn)
635                         continue;
636
637                 if (TransactionIdIsNormal(dbform->datvacuumxid) &&
638                         TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
639                         vacuumXID = dbform->datvacuumxid;
640                 if (TransactionIdIsNormal(dbform->datfrozenxid) &&
641                         TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
642                         frozenXID = dbform->datfrozenxid;
643         }
644
645         heap_endscan(scan);
646
647         heap_close(relation, AccessShareLock);
648
649         /* Truncate CLOG to the oldest vacuumxid */
650         TruncateCLOG(vacuumXID);
651
652         /* Give warning about impending wraparound problems */
653         age = (int32) (GetCurrentTransactionId() - frozenXID);
654         if (age > (int32) ((MaxTransactionId >> 3) * 3))
655                 elog(NOTICE, "Some databases have not been vacuumed in %d transactions."
656                          "\n\tBetter vacuum them within %d transactions,"
657                          "\n\tor you may have a wraparound failure.",
658                          age, (int32) (MaxTransactionId >> 1) - age);
659 }
660
661
662 /****************************************************************************
663  *                                                                                                                                                      *
664  *                      Code common to both flavors of VACUUM                                                   *
665  *                                                                                                                                                      *
666  ****************************************************************************
667  */
668
669
670 /*
671  *      vacuum_rel() -- vacuum one heap relation
672  *
673  *              Doing one heap at a time incurs extra overhead, since we need to
674  *              check that the heap exists again just before we vacuum it.      The
675  *              reason that we do this is so that vacuuming can be spread across
676  *              many small transactions.  Otherwise, two-phase locking would require
677  *              us to lock the entire database during one pass of the vacuum cleaner.
678  *
679  *              At entry and exit, we are not inside a transaction.
680  */
681 static void
682 vacuum_rel(Oid relid, VacuumStmt *vacstmt)
683 {
684         LOCKMODE        lmode;
685         Relation        onerel;
686         LockRelId       onerelid;
687         Oid                     toast_relid;
688
689         /* Begin a transaction for vacuuming this relation */
690         StartTransactionCommand();
691
692         /*
693          * Check for user-requested abort.      Note we want this to be inside a
694          * transaction, so xact.c doesn't issue useless NOTICE.
695          */
696         CHECK_FOR_INTERRUPTS();
697
698         /*
699          * Race condition -- if the pg_class tuple has gone away since the
700          * last time we saw it, we don't need to vacuum it.
701          */
702         if (!SearchSysCacheExists(RELOID,
703                                                           ObjectIdGetDatum(relid),
704                                                           0, 0, 0))
705         {
706                 CommitTransactionCommand();
707                 return;
708         }
709
710         /*
711          * Determine the type of lock we want --- hard exclusive lock for a
712          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
713          * vacuum.      Either way, we can be sure that no other backend is
714          * vacuuming the same table.
715          */
716         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
717
718         /*
719          * Open the class, get an appropriate lock on it, and check
720          * permissions.
721          *
722          * We allow the user to vacuum a table if he is superuser, the table
723          * owner, or the database owner (but in the latter case, only if it's
724          * not a shared relation).      pg_ownercheck includes the superuser case.
725          *
726          * Note we choose to treat permissions failure as a NOTICE and keep
727          * trying to vacuum the rest of the DB --- is this appropriate?
728          */
729         onerel = heap_open(relid, lmode);
730
731         if (!(pg_ownercheck(GetUserId(), RelationGetRelationName(onerel),
732                                                 RELNAME) ||
733                   (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
734         {
735                 elog(NOTICE, "Skipping \"%s\" --- only table or database owner can VACUUM it",
736                          RelationGetRelationName(onerel));
737                 heap_close(onerel, lmode);
738                 CommitTransactionCommand();
739                 return;
740         }
741
742         /*
743          * Get a session-level lock too. This will protect our access to the
744          * relation across multiple transactions, so that we can vacuum the
745          * relation's TOAST table (if any) secure in the knowledge that no one
746          * is deleting the parent relation.
747          *
748          * NOTE: this cannot block, even if someone else is waiting for access,
749          * because the lock manager knows that both lock requests are from the
750          * same process.
751          */
752         onerelid = onerel->rd_lockInfo.lockRelId;
753         LockRelationForSession(&onerelid, lmode);
754
755         /*
756          * Remember the relation's TOAST relation for later
757          */
758         toast_relid = onerel->rd_rel->reltoastrelid;
759
760         /*
761          * Do the actual work --- either FULL or "lazy" vacuum
762          */
763         if (vacstmt->full)
764                 full_vacuum_rel(onerel, vacstmt);
765         else
766                 lazy_vacuum_rel(onerel, vacstmt);
767
768         /* all done with this class, but hold lock until commit */
769         heap_close(onerel, NoLock);
770
771         /*
772          * Complete the transaction and free all temporary memory used.
773          */
774         CommitTransactionCommand();
775
776         /*
777          * If the relation has a secondary toast rel, vacuum that too while we
778          * still hold the session lock on the master table.  Note however that
779          * "analyze" will not get done on the toast table.      This is good,
780          * because the toaster always uses hardcoded index access and
781          * statistics are totally unimportant for toast relations.
782          */
783         if (toast_relid != InvalidOid)
784                 vacuum_rel(toast_relid, vacstmt);
785
786         /*
787          * Now release the session-level lock on the master table.
788          */
789         UnlockRelationForSession(&onerelid, lmode);
790 }
791
792
793 /****************************************************************************
794  *                                                                                                                                                      *
795  *                      Code for VACUUM FULL (only)                                                                             *
796  *                                                                                                                                                      *
797  ****************************************************************************
798  */
799
800
801 /*
802  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
803  *
804  *              This routine vacuums a single heap, cleans out its indexes, and
805  *              updates its num_pages and num_tuples statistics.
806  *
807  *              At entry, we have already established a transaction and opened
808  *              and locked the relation.
809  */
810 static void
811 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
812 {
813         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
814                                                                                  * clean indexes */
815         VacPageListData fraged_pages;           /* List of pages with space enough
816                                                                                  * for re-using */
817         Relation   *Irel;
818         int                     nindexes,
819                                 i;
820         VRelStats  *vacrelstats;
821         bool            reindex = false;
822
823         if (IsIgnoringSystemIndexes() &&
824                 IsSystemRelationName(RelationGetRelationName(onerel)))
825                 reindex = true;
826
827         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
828                                                   &OldestXmin, &FreezeLimit);
829
830         /*
831          * Set up statistics-gathering machinery.
832          */
833         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
834         vacrelstats->rel_pages = 0;
835         vacrelstats->rel_tuples = 0;
836         vacrelstats->hasindex = false;
837
838         /* scan the heap */
839         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
840         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
841
842         /* Now open all indexes of the relation */
843         vac_open_indexes(onerel, &nindexes, &Irel);
844         if (!Irel)
845                 reindex = false;
846         else if (!RelationGetForm(onerel)->relhasindex)
847                 reindex = true;
848         if (nindexes > 0)
849                 vacrelstats->hasindex = true;
850
851 #ifdef NOT_USED
852
853         /*
854          * reindex in VACUUM is dangerous under WAL. ifdef out until it
855          * becomes safe.
856          */
857         if (reindex)
858         {
859                 vac_close_indexes(nindexes, Irel);
860                 Irel = (Relation *) NULL;
861                 activate_indexes_of_a_table(RelationGetRelid(onerel), false);
862         }
863 #endif   /* NOT_USED */
864
865         /* Clean/scan index relation(s) */
866         if (Irel != (Relation *) NULL)
867         {
868                 if (vacuum_pages.num_pages > 0)
869                 {
870                         for (i = 0; i < nindexes; i++)
871                                 vacuum_index(&vacuum_pages, Irel[i],
872                                                          vacrelstats->rel_tuples, 0);
873                 }
874                 else
875                 {
876                         /* just scan indexes to update statistic */
877                         for (i = 0; i < nindexes; i++)
878                                 scan_index(Irel[i], vacrelstats->rel_tuples);
879                 }
880         }
881
882         if (fraged_pages.num_pages > 0)
883         {
884                 /* Try to shrink heap */
885                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
886                                         nindexes, Irel);
887                 vac_close_indexes(nindexes, Irel);
888         }
889         else
890         {
891                 vac_close_indexes(nindexes, Irel);
892                 if (vacuum_pages.num_pages > 0)
893                 {
894                         /* Clean pages from vacuum_pages list */
895                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
896                 }
897                 else
898                 {
899                         /*
900                          * Flush dirty pages out to disk.  We must do this even if we
901                          * didn't do anything else, because we want to ensure that all
902                          * tuples have correct on-row commit status on disk (see
903                          * bufmgr.c's comments for FlushRelationBuffers()).
904                          */
905                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
906                         if (i < 0)
907                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
908                                          i);
909                 }
910         }
911
912 #ifdef NOT_USED
913         if (reindex)
914                 activate_indexes_of_a_table(RelationGetRelid(onerel), true);
915 #endif   /* NOT_USED */
916
917         /* update shared free space map with final free space info */
918         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
919
920         /* update statistics in pg_class */
921         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
922                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
923 }
924
925
926 /*
927  *      scan_heap() -- scan an open heap relation
928  *
929  *              This routine sets commit status bits, constructs vacuum_pages (list
930  *              of pages we need to compact free space on and/or clean indexes of
931  *              deleted tuples), constructs fraged_pages (list of pages with free
932  *              space that tuples could be moved into), and calculates statistics
933  *              on the number of live tuples in the heap.
934  */
935 static void
936 scan_heap(VRelStats *vacrelstats, Relation onerel,
937                   VacPageList vacuum_pages, VacPageList fraged_pages)
938 {
939         BlockNumber nblocks,
940                                 blkno;
941         ItemId          itemid;
942         Buffer          buf;
943         HeapTupleData tuple;
944         OffsetNumber offnum,
945                                 maxoff;
946         bool            pgchanged,
947                                 tupgone,
948                                 notup;
949         char       *relname;
950         VacPage         vacpage,
951                                 vacpagecopy;
952         BlockNumber empty_pages,
953                                 new_pages,
954                                 changed_pages,
955                                 empty_end_pages;
956         double          num_tuples,
957                                 tups_vacuumed,
958                                 nkeep,
959                                 nunused;
960         double          free_size,
961                                 usable_free_size;
962         Size            min_tlen = MaxTupleSize;
963         Size            max_tlen = 0;
964         int                     i;
965         bool            do_shrinking = true;
966         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
967         int                     num_vtlinks = 0;
968         int                     free_vtlinks = 100;
969         VacRUsage       ru0;
970
971         vac_init_rusage(&ru0);
972
973         relname = RelationGetRelationName(onerel);
974         elog(MESSAGE_LEVEL, "--Relation %s--", relname);
975
976         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
977         num_tuples = tups_vacuumed = nkeep = nunused = 0;
978         free_size = 0;
979
980         nblocks = RelationGetNumberOfBlocks(onerel);
981
982         /*
983          * We initially create each VacPage item in a maximal-sized workspace,
984          * then copy the workspace into a just-large-enough copy.
985          */
986         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
987
988         for (blkno = 0; blkno < nblocks; blkno++)
989         {
990                 Page            page,
991                                         tempPage = NULL;
992                 bool            do_reap,
993                                         do_frag;
994
995                 buf = ReadBuffer(onerel, blkno);
996                 page = BufferGetPage(buf);
997
998                 vacpage->blkno = blkno;
999                 vacpage->offsets_used = 0;
1000                 vacpage->offsets_free = 0;
1001
1002                 if (PageIsNew(page))
1003                 {
1004                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
1005                                  relname, blkno);
1006                         PageInit(page, BufferGetPageSize(buf), 0);
1007                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1008                         free_size += (vacpage->free - sizeof(ItemIdData));
1009                         new_pages++;
1010                         empty_end_pages++;
1011                         vacpagecopy = copy_vac_page(vacpage);
1012                         vpage_insert(vacuum_pages, vacpagecopy);
1013                         vpage_insert(fraged_pages, vacpagecopy);
1014                         WriteBuffer(buf);
1015                         continue;
1016                 }
1017
1018                 if (PageIsEmpty(page))
1019                 {
1020                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1021                         free_size += (vacpage->free - sizeof(ItemIdData));
1022                         empty_pages++;
1023                         empty_end_pages++;
1024                         vacpagecopy = copy_vac_page(vacpage);
1025                         vpage_insert(vacuum_pages, vacpagecopy);
1026                         vpage_insert(fraged_pages, vacpagecopy);
1027                         ReleaseBuffer(buf);
1028                         continue;
1029                 }
1030
1031                 pgchanged = false;
1032                 notup = true;
1033                 maxoff = PageGetMaxOffsetNumber(page);
1034                 for (offnum = FirstOffsetNumber;
1035                          offnum <= maxoff;
1036                          offnum = OffsetNumberNext(offnum))
1037                 {
1038                         uint16          sv_infomask;
1039
1040                         itemid = PageGetItemId(page, offnum);
1041
1042                         /*
1043                          * Collect un-used items too - it's possible to have indexes
1044                          * pointing here after crash.
1045                          */
1046                         if (!ItemIdIsUsed(itemid))
1047                         {
1048                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1049                                 nunused += 1;
1050                                 continue;
1051                         }
1052
1053                         tuple.t_datamcxt = NULL;
1054                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1055                         tuple.t_len = ItemIdGetLength(itemid);
1056                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1057
1058                         tupgone = false;
1059                         sv_infomask = tuple.t_data->t_infomask;
1060
1061                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1062                         {
1063                                 case HEAPTUPLE_DEAD:
1064                                         tupgone = true;         /* we can delete the tuple */
1065                                         break;
1066                                 case HEAPTUPLE_LIVE:
1067
1068                                         /*
1069                                          * Tuple is good.  Consider whether to replace its
1070                                          * xmin value with FrozenTransactionId.
1071                                          */
1072                                         if (TransactionIdIsNormal(tuple.t_data->t_xmin) &&
1073                                                 TransactionIdPrecedes(tuple.t_data->t_xmin,
1074                                                                                           FreezeLimit))
1075                                         {
1076                                                 tuple.t_data->t_xmin = FrozenTransactionId;
1077                                                 /* infomask should be okay already */
1078                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1079                                                 pgchanged = true;
1080                                         }
1081                                         break;
1082                                 case HEAPTUPLE_RECENTLY_DEAD:
1083
1084                                         /*
1085                                          * If tuple is recently deleted then we must not
1086                                          * remove it from relation.
1087                                          */
1088                                         nkeep += 1;
1089
1090                                         /*
1091                                          * If we do shrinking and this tuple is updated one
1092                                          * then remember it to construct updated tuple
1093                                          * dependencies.
1094                                          */
1095                                         if (do_shrinking &&
1096                                                 !(ItemPointerEquals(&(tuple.t_self),
1097                                                                                         &(tuple.t_data->t_ctid))))
1098                                         {
1099                                                 if (free_vtlinks == 0)
1100                                                 {
1101                                                         free_vtlinks = 1000;
1102                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1103                                                                                    (free_vtlinks + num_vtlinks) *
1104                                                                                                  sizeof(VTupleLinkData));
1105                                                 }
1106                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1107                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1108                                                 free_vtlinks--;
1109                                                 num_vtlinks++;
1110                                         }
1111                                         break;
1112                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1113
1114                                         /*
1115                                          * This should not happen, since we hold exclusive
1116                                          * lock on the relation; shouldn't we raise an error?
1117                                          */
1118                                         elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
1119                                                  relname, blkno, offnum, tuple.t_data->t_xmin);
1120                                         do_shrinking = false;
1121                                         break;
1122                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1123
1124                                         /*
1125                                          * This should not happen, since we hold exclusive
1126                                          * lock on the relation; shouldn't we raise an error?
1127                                          */
1128                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
1129                                                  relname, blkno, offnum, tuple.t_data->t_xmax);
1130                                         do_shrinking = false;
1131                                         break;
1132                                 default:
1133                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
1134                                         break;
1135                         }
1136
1137                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1138                         if (sv_infomask != tuple.t_data->t_infomask)
1139                                 pgchanged = true;
1140
1141                         /*
1142                          * Other checks...
1143                          */
1144                         if (!OidIsValid(tuple.t_data->t_oid) &&
1145                                 onerel->rd_rel->relhasoids)
1146                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
1147                                          relname, blkno, offnum, (int) tupgone);
1148
1149                         if (tupgone)
1150                         {
1151                                 ItemId          lpp;
1152
1153                                 /*
1154                                  * Here we are building a temporary copy of the page with
1155                                  * dead tuples removed.  Below we will apply
1156                                  * PageRepairFragmentation to the copy, so that we can
1157                                  * determine how much space will be available after
1158                                  * removal of dead tuples.      But note we are NOT changing
1159                                  * the real page yet...
1160                                  */
1161                                 if (tempPage == (Page) NULL)
1162                                 {
1163                                         Size            pageSize;
1164
1165                                         pageSize = PageGetPageSize(page);
1166                                         tempPage = (Page) palloc(pageSize);
1167                                         memcpy(tempPage, page, pageSize);
1168                                 }
1169
1170                                 /* mark it unused on the temp page */
1171                                 lpp = PageGetItemId(tempPage, offnum);
1172                                 lpp->lp_flags &= ~LP_USED;
1173
1174                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1175                                 tups_vacuumed += 1;
1176                         }
1177                         else
1178                         {
1179                                 num_tuples += 1;
1180                                 notup = false;
1181                                 if (tuple.t_len < min_tlen)
1182                                         min_tlen = tuple.t_len;
1183                                 if (tuple.t_len > max_tlen)
1184                                         max_tlen = tuple.t_len;
1185                         }
1186                 }                                               /* scan along page */
1187
1188                 if (tempPage != (Page) NULL)
1189                 {
1190                         /* Some tuples are removable; figure free space after removal */
1191                         PageRepairFragmentation(tempPage, NULL);
1192                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1193                         pfree(tempPage);
1194                         do_reap = true;
1195                 }
1196                 else
1197                 {
1198                         /* Just use current available space */
1199                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1200                         /* Need to reap the page if it has ~LP_USED line pointers */
1201                         do_reap = (vacpage->offsets_free > 0);
1202                 }
1203
1204                 free_size += vacpage->free;
1205
1206                 /*
1207                  * Add the page to fraged_pages if it has a useful amount of free
1208                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1209                  * don't know that accurately near the start of the relation, so
1210                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1211                  */
1212                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1213
1214                 if (do_reap || do_frag)
1215                 {
1216                         vacpagecopy = copy_vac_page(vacpage);
1217                         if (do_reap)
1218                                 vpage_insert(vacuum_pages, vacpagecopy);
1219                         if (do_frag)
1220                                 vpage_insert(fraged_pages, vacpagecopy);
1221                 }
1222
1223                 if (notup)
1224                         empty_end_pages++;
1225                 else
1226                         empty_end_pages = 0;
1227
1228                 if (pgchanged)
1229                 {
1230                         WriteBuffer(buf);
1231                         changed_pages++;
1232                 }
1233                 else
1234                         ReleaseBuffer(buf);
1235         }
1236
1237         pfree(vacpage);
1238
1239         /* save stats in the rel list for use later */
1240         vacrelstats->rel_tuples = num_tuples;
1241         vacrelstats->rel_pages = nblocks;
1242         if (num_tuples == 0)
1243                 min_tlen = max_tlen = 0;
1244         vacrelstats->min_tlen = min_tlen;
1245         vacrelstats->max_tlen = max_tlen;
1246
1247         vacuum_pages->empty_end_pages = empty_end_pages;
1248         fraged_pages->empty_end_pages = empty_end_pages;
1249
1250         /*
1251          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1252          * remove any "empty" end-pages from the list, and compute usable free
1253          * space = free space in remaining pages.
1254          */
1255         if (do_shrinking)
1256         {
1257                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1258                 fraged_pages->num_pages -= empty_end_pages;
1259                 usable_free_size = 0;
1260                 for (i = 0; i < fraged_pages->num_pages; i++)
1261                         usable_free_size += fraged_pages->pagedesc[i]->free;
1262         }
1263         else
1264         {
1265                 fraged_pages->num_pages = 0;
1266                 usable_free_size = 0;
1267         }
1268
1269         if (usable_free_size > 0 && num_vtlinks > 0)
1270         {
1271                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1272                           vac_cmp_vtlinks);
1273                 vacrelstats->vtlinks = vtlinks;
1274                 vacrelstats->num_vtlinks = num_vtlinks;
1275         }
1276         else
1277         {
1278                 vacrelstats->vtlinks = NULL;
1279                 vacrelstats->num_vtlinks = 0;
1280                 pfree(vtlinks);
1281         }
1282
1283         elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
1284 Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
1285 Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u.\n\t%s",
1286                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1287                  new_pages, num_tuples, tups_vacuumed,
1288                  nkeep, vacrelstats->num_vtlinks,
1289                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1290                  free_size, usable_free_size,
1291                  empty_end_pages, fraged_pages->num_pages,
1292                  vac_show_rusage(&ru0));
1293
1294 }
1295
1296
1297 /*
1298  *      repair_frag() -- try to repair relation's fragmentation
1299  *
1300  *              This routine marks dead tuples as unused and tries re-use dead space
1301  *              by moving tuples (and inserting indexes if needed). It constructs
1302  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1303  *              for them after committing (in hack-manner - without losing locks
1304  *              and freeing memory!) current transaction. It truncates relation
1305  *              if some end-blocks are gone away.
1306  */
1307 static void
1308 repair_frag(VRelStats *vacrelstats, Relation onerel,
1309                         VacPageList vacuum_pages, VacPageList fraged_pages,
1310                         int nindexes, Relation *Irel)
1311 {
1312         TransactionId myXID;
1313         CommandId       myCID;
1314         Buffer          buf,
1315                                 cur_buffer;
1316         BlockNumber nblocks,
1317                                 blkno;
1318         BlockNumber last_move_dest_block = 0,
1319                                 last_vacuum_block;
1320         Page            page,
1321                                 ToPage = NULL;
1322         OffsetNumber offnum,
1323                                 maxoff,
1324                                 newoff,
1325                                 max_offset;
1326         ItemId          itemid,
1327                                 newitemid;
1328         HeapTupleData tuple,
1329                                 newtup;
1330         TupleDesc       tupdesc;
1331         ResultRelInfo *resultRelInfo;
1332         EState     *estate;
1333         TupleTable      tupleTable;
1334         TupleTableSlot *slot;
1335         VacPageListData Nvacpagelist;
1336         VacPage         cur_page = NULL,
1337                                 last_vacuum_page,
1338                                 vacpage,
1339                            *curpage;
1340         int                     cur_item = 0;
1341         int                     i;
1342         Size            tuple_len;
1343         int                     num_moved,
1344                                 num_fraged_pages,
1345                                 vacuumed_pages;
1346         int                     checked_moved,
1347                                 num_tuples,
1348                                 keep_tuples = 0;
1349         bool            isempty,
1350                                 dowrite,
1351                                 chain_tuple_moved;
1352         VacRUsage       ru0;
1353
1354         vac_init_rusage(&ru0);
1355
1356         myXID = GetCurrentTransactionId();
1357         myCID = GetCurrentCommandId();
1358
1359         tupdesc = RelationGetDescr(onerel);
1360
1361         /*
1362          * We need a ResultRelInfo and an EState so we can use the regular
1363          * executor's index-entry-making machinery.
1364          */
1365         resultRelInfo = makeNode(ResultRelInfo);
1366         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1367         resultRelInfo->ri_RelationDesc = onerel;
1368         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1369
1370         ExecOpenIndices(resultRelInfo);
1371
1372         estate = CreateExecutorState();
1373         estate->es_result_relations = resultRelInfo;
1374         estate->es_num_result_relations = 1;
1375         estate->es_result_relation_info = resultRelInfo;
1376
1377         /* Set up a dummy tuple table too */
1378         tupleTable = ExecCreateTupleTable(1);
1379         slot = ExecAllocTableSlot(tupleTable);
1380         ExecSetSlotDescriptor(slot, tupdesc, false);
1381
1382         Nvacpagelist.num_pages = 0;
1383         num_fraged_pages = fraged_pages->num_pages;
1384         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1385         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1386         if (vacuumed_pages > 0)
1387         {
1388                 /* get last reaped page from vacuum_pages */
1389                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1390                 last_vacuum_block = last_vacuum_page->blkno;
1391         }
1392         else
1393         {
1394                 last_vacuum_page = NULL;
1395                 last_vacuum_block = InvalidBlockNumber;
1396         }
1397         cur_buffer = InvalidBuffer;
1398         num_moved = 0;
1399
1400         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1401         vacpage->offsets_used = vacpage->offsets_free = 0;
1402
1403         /*
1404          * Scan pages backwards from the last nonempty page, trying to move
1405          * tuples down to lower pages.  Quit when we reach a page that we have
1406          * moved any tuples onto, or the first page if we haven't moved
1407          * anything, or when we find a page we cannot completely empty (this
1408          * last condition is handled by "break" statements within the loop).
1409          *
1410          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1411          * in order by blkno.
1412          */
1413         nblocks = vacrelstats->rel_pages;
1414         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1415                  blkno > last_move_dest_block;
1416                  blkno--)
1417         {
1418                 /*
1419                  * Forget fraged_pages pages at or after this one; they're no
1420                  * longer useful as move targets, since we only want to move down.
1421                  * Note that since we stop the outer loop at last_move_dest_block,
1422                  * pages removed here cannot have had anything moved onto them
1423                  * already.
1424                  *
1425                  * Also note that we don't change the stored fraged_pages list, only
1426                  * our local variable num_fraged_pages; so the forgotten pages are
1427                  * still available to be loaded into the free space map later.
1428                  */
1429                 while (num_fraged_pages > 0 &&
1430                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1431                 {
1432                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1433                         --num_fraged_pages;
1434                 }
1435
1436                 /*
1437                  * Process this page of relation.
1438                  */
1439                 buf = ReadBuffer(onerel, blkno);
1440                 page = BufferGetPage(buf);
1441
1442                 vacpage->offsets_free = 0;
1443
1444                 isempty = PageIsEmpty(page);
1445
1446                 dowrite = false;
1447
1448                 /* Is the page in the vacuum_pages list? */
1449                 if (blkno == last_vacuum_block)
1450                 {
1451                         if (last_vacuum_page->offsets_free > 0)
1452                         {
1453                                 /* there are dead tuples on this page - clean them */
1454                                 Assert(!isempty);
1455                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1456                                 vacuum_page(onerel, buf, last_vacuum_page);
1457                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1458                                 dowrite = true;
1459                         }
1460                         else
1461                                 Assert(isempty);
1462                         --vacuumed_pages;
1463                         if (vacuumed_pages > 0)
1464                         {
1465                                 /* get prev reaped page from vacuum_pages */
1466                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1467                                 last_vacuum_block = last_vacuum_page->blkno;
1468                         }
1469                         else
1470                         {
1471                                 last_vacuum_page = NULL;
1472                                 last_vacuum_block = InvalidBlockNumber;
1473                         }
1474                         if (isempty)
1475                         {
1476                                 ReleaseBuffer(buf);
1477                                 continue;
1478                         }
1479                 }
1480                 else
1481                         Assert(!isempty);
1482
1483                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1484                                                                                  * off this page, yet */
1485                 vacpage->blkno = blkno;
1486                 maxoff = PageGetMaxOffsetNumber(page);
1487                 for (offnum = FirstOffsetNumber;
1488                          offnum <= maxoff;
1489                          offnum = OffsetNumberNext(offnum))
1490                 {
1491                         itemid = PageGetItemId(page, offnum);
1492
1493                         if (!ItemIdIsUsed(itemid))
1494                                 continue;
1495
1496                         tuple.t_datamcxt = NULL;
1497                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1498                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1499                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1500
1501                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1502                         {
1503                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1504                                         elog(ERROR, "Invalid XID in t_cmin");
1505                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1506                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1507
1508                                 /*
1509                                  * If this (chain) tuple is moved by me already then I
1510                                  * have to check is it in vacpage or not - i.e. is it
1511                                  * moved while cleaning this page or some previous one.
1512                                  */
1513                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1514                                 {
1515                                         if (keep_tuples == 0)
1516                                                 continue;
1517                                         if (chain_tuple_moved)          /* some chains was moved
1518                                                                                                  * while */
1519                                         {                       /* cleaning this page */
1520                                                 Assert(vacpage->offsets_free > 0);
1521                                                 for (i = 0; i < vacpage->offsets_free; i++)
1522                                                 {
1523                                                         if (vacpage->offsets[i] == offnum)
1524                                                                 break;
1525                                                 }
1526                                                 if (i >= vacpage->offsets_free) /* not found */
1527                                                 {
1528                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1529                                                         keep_tuples--;
1530                                                 }
1531                                         }
1532                                         else
1533                                         {
1534                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1535                                                 keep_tuples--;
1536                                         }
1537                                         continue;
1538                                 }
1539                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1540                         }
1541
1542                         /*
1543                          * If this tuple is in the chain of tuples created in updates
1544                          * by "recent" transactions then we have to move all chain of
1545                          * tuples to another places.
1546                          */
1547                         if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
1548                          !TransactionIdPrecedes(tuple.t_data->t_xmin, OldestXmin)) ||
1549                                 (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1550                                  !(ItemPointerEquals(&(tuple.t_self),
1551                                                                          &(tuple.t_data->t_ctid)))))
1552                         {
1553                                 Buffer          Cbuf = buf;
1554                                 Page            Cpage;
1555                                 ItemId          Citemid;
1556                                 ItemPointerData Ctid;
1557                                 HeapTupleData tp = tuple;
1558                                 Size            tlen = tuple_len;
1559                                 VTupleMove      vtmove = (VTupleMove)
1560                                 palloc(100 * sizeof(VTupleMoveData));
1561                                 int                     num_vtmove = 0;
1562                                 int                     free_vtmove = 100;
1563                                 VacPage         to_vacpage = NULL;
1564                                 int                     to_item = 0;
1565                                 bool            freeCbuf = false;
1566                                 int                     ti;
1567
1568                                 if (vacrelstats->vtlinks == NULL)
1569                                         elog(ERROR, "No one parent tuple was found");
1570                                 if (cur_buffer != InvalidBuffer)
1571                                 {
1572                                         WriteBuffer(cur_buffer);
1573                                         cur_buffer = InvalidBuffer;
1574                                 }
1575
1576                                 /*
1577                                  * If this tuple is in the begin/middle of the chain then
1578                                  * we have to move to the end of chain.
1579                                  */
1580                                 while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1581                                            !(ItemPointerEquals(&(tp.t_self),
1582                                                                                    &(tp.t_data->t_ctid))))
1583                                 {
1584                                         Ctid = tp.t_data->t_ctid;
1585                                         if (freeCbuf)
1586                                                 ReleaseBuffer(Cbuf);
1587                                         freeCbuf = true;
1588                                         Cbuf = ReadBuffer(onerel,
1589                                                                           ItemPointerGetBlockNumber(&Ctid));
1590                                         Cpage = BufferGetPage(Cbuf);
1591                                         Citemid = PageGetItemId(Cpage,
1592                                                                           ItemPointerGetOffsetNumber(&Ctid));
1593                                         if (!ItemIdIsUsed(Citemid))
1594                                         {
1595                                                 /*
1596                                                  * This means that in the middle of chain there
1597                                                  * was tuple updated by older (than OldestXmin)
1598                                                  * xaction and this tuple is already deleted by
1599                                                  * me. Actually, upper part of chain should be
1600                                                  * removed and seems that this should be handled
1601                                                  * in scan_heap(), but it's not implemented at the
1602                                                  * moment and so we just stop shrinking here.
1603                                                  */
1604                                                 ReleaseBuffer(Cbuf);
1605                                                 pfree(vtmove);
1606                                                 vtmove = NULL;
1607                                                 elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1608                                                 break;
1609                                         }
1610                                         tp.t_datamcxt = NULL;
1611                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1612                                         tp.t_self = Ctid;
1613                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1614                                 }
1615                                 if (vtmove == NULL)
1616                                         break;
1617                                 /* first, can chain be moved ? */
1618                                 for (;;)
1619                                 {
1620                                         if (to_vacpage == NULL ||
1621                                                 !enough_space(to_vacpage, tlen))
1622                                         {
1623                                                 for (i = 0; i < num_fraged_pages; i++)
1624                                                 {
1625                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1626                                                                 break;
1627                                                 }
1628
1629                                                 if (i == num_fraged_pages)
1630                                                 {
1631                                                         /* can't move item anywhere */
1632                                                         for (i = 0; i < num_vtmove; i++)
1633                                                         {
1634                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1635                                                                 (vtmove[i].vacpage->offsets_used)--;
1636                                                         }
1637                                                         num_vtmove = 0;
1638                                                         break;
1639                                                 }
1640                                                 to_item = i;
1641                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1642                                         }
1643                                         to_vacpage->free -= MAXALIGN(tlen);
1644                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1645                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1646                                         (to_vacpage->offsets_used)++;
1647                                         if (free_vtmove == 0)
1648                                         {
1649                                                 free_vtmove = 1000;
1650                                                 vtmove = (VTupleMove) repalloc(vtmove,
1651                                                                                          (free_vtmove + num_vtmove) *
1652                                                                                                  sizeof(VTupleMoveData));
1653                                         }
1654                                         vtmove[num_vtmove].tid = tp.t_self;
1655                                         vtmove[num_vtmove].vacpage = to_vacpage;
1656                                         if (to_vacpage->offsets_used == 1)
1657                                                 vtmove[num_vtmove].cleanVpd = true;
1658                                         else
1659                                                 vtmove[num_vtmove].cleanVpd = false;
1660                                         free_vtmove--;
1661                                         num_vtmove++;
1662
1663                                         /* All done ? */
1664                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1665                                         TransactionIdPrecedes(tp.t_data->t_xmin, OldestXmin))
1666                                                 break;
1667
1668                                         /* Well, try to find tuple with old row version */
1669                                         for (;;)
1670                                         {
1671                                                 Buffer          Pbuf;
1672                                                 Page            Ppage;
1673                                                 ItemId          Pitemid;
1674                                                 HeapTupleData Ptp;
1675                                                 VTupleLinkData vtld,
1676                                                                    *vtlp;
1677
1678                                                 vtld.new_tid = tp.t_self;
1679                                                 vtlp = (VTupleLink)
1680                                                         vac_bsearch((void *) &vtld,
1681                                                                                 (void *) (vacrelstats->vtlinks),
1682                                                                                 vacrelstats->num_vtlinks,
1683                                                                                 sizeof(VTupleLinkData),
1684                                                                                 vac_cmp_vtlinks);
1685                                                 if (vtlp == NULL)
1686                                                         elog(ERROR, "Parent tuple was not found");
1687                                                 tp.t_self = vtlp->this_tid;
1688                                                 Pbuf = ReadBuffer(onerel,
1689                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1690                                                 Ppage = BufferGetPage(Pbuf);
1691                                                 Pitemid = PageGetItemId(Ppage,
1692                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1693                                                 if (!ItemIdIsUsed(Pitemid))
1694                                                         elog(ERROR, "Parent itemid marked as unused");
1695                                                 Ptp.t_datamcxt = NULL;
1696                                                 Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1697                                                 Assert(ItemPointerEquals(&(vtld.new_tid),
1698                                                                                                  &(Ptp.t_data->t_ctid)));
1699
1700                                                 /*
1701                                                  * Read above about cases when
1702                                                  * !ItemIdIsUsed(Citemid) (child item is
1703                                                  * removed)... Due to the fact that at the moment
1704                                                  * we don't remove unuseful part of update-chain,
1705                                                  * it's possible to get too old parent row here.
1706                                                  * Like as in the case which caused this problem,
1707                                                  * we stop shrinking here. I could try to find
1708                                                  * real parent row but want not to do it because
1709                                                  * of real solution will be implemented anyway,
1710                                                  * latter, and we are too close to 6.5 release. -
1711                                                  * vadim 06/11/99
1712                                                  */
1713                                                 if (!(TransactionIdEquals(Ptp.t_data->t_xmax,
1714                                                                                                   tp.t_data->t_xmin)))
1715                                                 {
1716                                                         if (freeCbuf)
1717                                                                 ReleaseBuffer(Cbuf);
1718                                                         freeCbuf = false;
1719                                                         ReleaseBuffer(Pbuf);
1720                                                         for (i = 0; i < num_vtmove; i++)
1721                                                         {
1722                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1723                                                                 (vtmove[i].vacpage->offsets_used)--;
1724                                                         }
1725                                                         num_vtmove = 0;
1726                                                         elog(NOTICE, "Too old parent tuple found - can't continue repair_frag");
1727                                                         break;
1728                                                 }
1729 #ifdef NOT_USED                                 /* I'm not sure that this will wotk
1730                                                                  * properly... */
1731
1732                                                 /*
1733                                                  * If this tuple is updated version of row and it
1734                                                  * was created by the same transaction then no one
1735                                                  * is interested in this tuple - mark it as
1736                                                  * removed.
1737                                                  */
1738                                                 if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
1739                                                         TransactionIdEquals(Ptp.t_data->t_xmin,
1740                                                                                                 Ptp.t_data->t_xmax))
1741                                                 {
1742                                                         TransactionIdStore(myXID,
1743                                                                 (TransactionId *) &(Ptp.t_data->t_cmin));
1744                                                         Ptp.t_data->t_infomask &=
1745                                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1746                                                         Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
1747                                                         WriteBuffer(Pbuf);
1748                                                         continue;
1749                                                 }
1750 #endif
1751                                                 tp.t_datamcxt = Ptp.t_datamcxt;
1752                                                 tp.t_data = Ptp.t_data;
1753                                                 tlen = tp.t_len = ItemIdGetLength(Pitemid);
1754                                                 if (freeCbuf)
1755                                                         ReleaseBuffer(Cbuf);
1756                                                 Cbuf = Pbuf;
1757                                                 freeCbuf = true;
1758                                                 break;
1759                                         }
1760                                         if (num_vtmove == 0)
1761                                                 break;
1762                                 }
1763                                 if (freeCbuf)
1764                                         ReleaseBuffer(Cbuf);
1765                                 if (num_vtmove == 0)    /* chain can't be moved */
1766                                 {
1767                                         pfree(vtmove);
1768                                         break;
1769                                 }
1770                                 ItemPointerSetInvalid(&Ctid);
1771                                 for (ti = 0; ti < num_vtmove; ti++)
1772                                 {
1773                                         VacPage         destvacpage = vtmove[ti].vacpage;
1774
1775                                         /* Get page to move from */
1776                                         tuple.t_self = vtmove[ti].tid;
1777                                         Cbuf = ReadBuffer(onerel,
1778                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1779
1780                                         /* Get page to move to */
1781                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1782
1783                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1784                                         if (cur_buffer != Cbuf)
1785                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1786
1787                                         ToPage = BufferGetPage(cur_buffer);
1788                                         Cpage = BufferGetPage(Cbuf);
1789
1790                                         Citemid = PageGetItemId(Cpage,
1791                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1792                                         tuple.t_datamcxt = NULL;
1793                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1794                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1795
1796                                         /*
1797                                          * make a copy of the source tuple, and then mark the
1798                                          * source tuple MOVED_OFF.
1799                                          */
1800                                         heap_copytuple_with_tuple(&tuple, &newtup);
1801
1802                                         RelationInvalidateHeapTuple(onerel, &tuple);
1803
1804                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1805                                         START_CRIT_SECTION();
1806
1807                                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1808                                         tuple.t_data->t_infomask &=
1809                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1810                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1811
1812                                         /*
1813                                          * If this page was not used before - clean it.
1814                                          *
1815                                          * NOTE: a nasty bug used to lurk here.  It is possible
1816                                          * for the source and destination pages to be the same
1817                                          * (since this tuple-chain member can be on a page
1818                                          * lower than the one we're currently processing in
1819                                          * the outer loop).  If that's true, then after
1820                                          * vacuum_page() the source tuple will have been
1821                                          * moved, and tuple.t_data will be pointing at
1822                                          * garbage.  Therefore we must do everything that uses
1823                                          * tuple.t_data BEFORE this step!!
1824                                          *
1825                                          * This path is different from the other callers of
1826                                          * vacuum_page, because we have already incremented
1827                                          * the vacpage's offsets_used field to account for the
1828                                          * tuple(s) we expect to move onto the page. Therefore
1829                                          * vacuum_page's check for offsets_used == 0 is wrong.
1830                                          * But since that's a good debugging check for all
1831                                          * other callers, we work around it here rather than
1832                                          * remove it.
1833                                          */
1834                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1835                                         {
1836                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1837
1838                                                 destvacpage->offsets_used = 0;
1839                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1840                                                 destvacpage->offsets_used = sv_offsets_used;
1841                                         }
1842
1843                                         /*
1844                                          * Update the state of the copied tuple, and store it
1845                                          * on the destination page.
1846                                          */
1847                                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1848                                         newtup.t_data->t_infomask &=
1849                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1850                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1851                                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1852                                                                                  InvalidOffsetNumber, LP_USED);
1853                                         if (newoff == InvalidOffsetNumber)
1854                                         {
1855                                                 elog(STOP, "moving chain: failed to add item with len = %lu to page %u",
1856                                                   (unsigned long) tuple_len, destvacpage->blkno);
1857                                         }
1858                                         newitemid = PageGetItemId(ToPage, newoff);
1859                                         pfree(newtup.t_data);
1860                                         newtup.t_datamcxt = NULL;
1861                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1862                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1863
1864                                         {
1865                                                 XLogRecPtr      recptr =
1866                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1867                                                                           cur_buffer, &newtup);
1868
1869                                                 if (Cbuf != cur_buffer)
1870                                                 {
1871                                                         PageSetLSN(Cpage, recptr);
1872                                                         PageSetSUI(Cpage, ThisStartUpID);
1873                                                 }
1874                                                 PageSetLSN(ToPage, recptr);
1875                                                 PageSetSUI(ToPage, ThisStartUpID);
1876                                         }
1877                                         END_CRIT_SECTION();
1878
1879                                         if (destvacpage->blkno > last_move_dest_block)
1880                                                 last_move_dest_block = destvacpage->blkno;
1881
1882                                         /*
1883                                          * Set new tuple's t_ctid pointing to itself for last
1884                                          * tuple in chain, and to next tuple in chain
1885                                          * otherwise.
1886                                          */
1887                                         if (!ItemPointerIsValid(&Ctid))
1888                                                 newtup.t_data->t_ctid = newtup.t_self;
1889                                         else
1890                                                 newtup.t_data->t_ctid = Ctid;
1891                                         Ctid = newtup.t_self;
1892
1893                                         num_moved++;
1894
1895                                         /*
1896                                          * Remember that we moved tuple from the current page
1897                                          * (corresponding index tuple will be cleaned).
1898                                          */
1899                                         if (Cbuf == buf)
1900                                                 vacpage->offsets[vacpage->offsets_free++] =
1901                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1902                                         else
1903                                                 keep_tuples++;
1904
1905                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1906                                         if (cur_buffer != Cbuf)
1907                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
1908
1909                                         /* Create index entries for the moved tuple */
1910                                         if (resultRelInfo->ri_NumIndices > 0)
1911                                         {
1912                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
1913                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
1914                                                                                           estate, true);
1915                                         }
1916
1917                                         WriteBuffer(cur_buffer);
1918                                         WriteBuffer(Cbuf);
1919                                 }
1920                                 cur_buffer = InvalidBuffer;
1921                                 pfree(vtmove);
1922                                 chain_tuple_moved = true;
1923                                 continue;
1924                         }
1925
1926                         /* try to find new page for this tuple */
1927                         if (cur_buffer == InvalidBuffer ||
1928                                 !enough_space(cur_page, tuple_len))
1929                         {
1930                                 if (cur_buffer != InvalidBuffer)
1931                                 {
1932                                         WriteBuffer(cur_buffer);
1933                                         cur_buffer = InvalidBuffer;
1934                                 }
1935                                 for (i = 0; i < num_fraged_pages; i++)
1936                                 {
1937                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
1938                                                 break;
1939                                 }
1940                                 if (i == num_fraged_pages)
1941                                         break;          /* can't move item anywhere */
1942                                 cur_item = i;
1943                                 cur_page = fraged_pages->pagedesc[cur_item];
1944                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
1945                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1946                                 ToPage = BufferGetPage(cur_buffer);
1947                                 /* if this page was not used before - clean it */
1948                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
1949                                         vacuum_page(onerel, cur_buffer, cur_page);
1950                         }
1951                         else
1952                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1953
1954                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1955
1956                         /* copy tuple */
1957                         heap_copytuple_with_tuple(&tuple, &newtup);
1958
1959                         RelationInvalidateHeapTuple(onerel, &tuple);
1960
1961                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1962                         START_CRIT_SECTION();
1963
1964                         /*
1965                          * Mark new tuple as moved_in by vacuum and store vacuum XID
1966                          * in t_cmin !!!
1967                          */
1968                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1969                         newtup.t_data->t_infomask &=
1970                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1971                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1972
1973                         /* add tuple to the page */
1974                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1975                                                                  InvalidOffsetNumber, LP_USED);
1976                         if (newoff == InvalidOffsetNumber)
1977                         {
1978                                 elog(STOP, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
1979                                          (unsigned long) tuple_len,
1980                                          cur_page->blkno, (unsigned long) cur_page->free,
1981                                          cur_page->offsets_used, cur_page->offsets_free);
1982                         }
1983                         newitemid = PageGetItemId(ToPage, newoff);
1984                         pfree(newtup.t_data);
1985                         newtup.t_datamcxt = NULL;
1986                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1987                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
1988                         newtup.t_self = newtup.t_data->t_ctid;
1989
1990                         /*
1991                          * Mark old tuple as moved_off by vacuum and store vacuum XID
1992                          * in t_cmin !!!
1993                          */
1994                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1995                         tuple.t_data->t_infomask &=
1996                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1997                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1998
1999                         {
2000                                 XLogRecPtr      recptr =
2001                                 log_heap_move(onerel, buf, tuple.t_self,
2002                                                           cur_buffer, &newtup);
2003
2004                                 PageSetLSN(page, recptr);
2005                                 PageSetSUI(page, ThisStartUpID);
2006                                 PageSetLSN(ToPage, recptr);
2007                                 PageSetSUI(ToPage, ThisStartUpID);
2008                         }
2009                         END_CRIT_SECTION();
2010
2011                         cur_page->offsets_used++;
2012                         num_moved++;
2013                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
2014                         if (cur_page->blkno > last_move_dest_block)
2015                                 last_move_dest_block = cur_page->blkno;
2016
2017                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2018
2019                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2020                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2021
2022                         /* insert index' tuples if needed */
2023                         if (resultRelInfo->ri_NumIndices > 0)
2024                         {
2025                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2026                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
2027                         }
2028                 }                                               /* walk along page */
2029
2030                 if (offnum < maxoff && keep_tuples > 0)
2031                 {
2032                         OffsetNumber off;
2033
2034                         for (off = OffsetNumberNext(offnum);
2035                                  off <= maxoff;
2036                                  off = OffsetNumberNext(off))
2037                         {
2038                                 itemid = PageGetItemId(page, off);
2039                                 if (!ItemIdIsUsed(itemid))
2040                                         continue;
2041                                 tuple.t_datamcxt = NULL;
2042                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2043                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2044                                         continue;
2045                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
2046                                         elog(ERROR, "Invalid XID in t_cmin (4)");
2047                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2048                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
2049                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2050                                 {
2051                                         /* some chains was moved while */
2052                                         if (chain_tuple_moved)
2053                                         {                       /* cleaning this page */
2054                                                 Assert(vacpage->offsets_free > 0);
2055                                                 for (i = 0; i < vacpage->offsets_free; i++)
2056                                                 {
2057                                                         if (vacpage->offsets[i] == off)
2058                                                                 break;
2059                                                 }
2060                                                 if (i >= vacpage->offsets_free) /* not found */
2061                                                 {
2062                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2063                                                         Assert(keep_tuples > 0);
2064                                                         keep_tuples--;
2065                                                 }
2066                                         }
2067                                         else
2068                                         {
2069                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2070                                                 Assert(keep_tuples > 0);
2071                                                 keep_tuples--;
2072                                         }
2073                                 }
2074                         }
2075                 }
2076
2077                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2078                 {
2079                         if (chain_tuple_moved)          /* else - they are ordered */
2080                         {
2081                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2082                                           sizeof(OffsetNumber), vac_cmp_offno);
2083                         }
2084                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2085                         WriteBuffer(buf);
2086                 }
2087                 else if (dowrite)
2088                         WriteBuffer(buf);
2089                 else
2090                         ReleaseBuffer(buf);
2091
2092                 if (offnum <= maxoff)
2093                         break;                          /* some item(s) left */
2094
2095         }                                                       /* walk along relation */
2096
2097         blkno++;                                        /* new number of blocks */
2098
2099         if (cur_buffer != InvalidBuffer)
2100         {
2101                 Assert(num_moved > 0);
2102                 WriteBuffer(cur_buffer);
2103         }
2104
2105         if (num_moved > 0)
2106         {
2107                 /*
2108                  * We have to commit our tuple movings before we truncate the
2109                  * relation.  Ideally we should do Commit/StartTransactionCommand
2110                  * here, relying on the session-level table lock to protect our
2111                  * exclusive access to the relation.  However, that would require
2112                  * a lot of extra code to close and re-open the relation, indexes,
2113                  * etc.  For now, a quick hack: record status of current
2114                  * transaction as committed, and continue.
2115                  */
2116                 RecordTransactionCommit();
2117         }
2118
2119         /*
2120          * We are not going to move any more tuples across pages, but we still
2121          * need to apply vacuum_page to compact free space in the remaining
2122          * pages in vacuum_pages list.  Note that some of these pages may also
2123          * be in the fraged_pages list, and may have had tuples moved onto
2124          * them; if so, we already did vacuum_page and needn't do it again.
2125          */
2126         for (i = 0, curpage = vacuum_pages->pagedesc;
2127                  i < vacuumed_pages;
2128                  i++, curpage++)
2129         {
2130                 Assert((*curpage)->blkno < blkno);
2131                 if ((*curpage)->offsets_used == 0)
2132                 {
2133                         /* this page was not used as a move target, so must clean it */
2134                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2135                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2136                         page = BufferGetPage(buf);
2137                         if (!PageIsEmpty(page))
2138                                 vacuum_page(onerel, buf, *curpage);
2139                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2140                         WriteBuffer(buf);
2141                 }
2142         }
2143
2144         /*
2145          * Now scan all the pages that we moved tuples onto and update tuple
2146          * status bits.  This is not really necessary, but will save time for
2147          * future transactions examining these tuples.
2148          *
2149          * XXX Notice that this code fails to clear HEAP_MOVED_OFF tuples from
2150          * pages that were move source pages but not move dest pages.  One
2151          * also wonders whether it wouldn't be better to skip this step and
2152          * let the tuple status updates happen someplace that's not holding an
2153          * exclusive lock on the relation.
2154          */
2155         checked_moved = 0;
2156         for (i = 0, curpage = fraged_pages->pagedesc;
2157                  i < num_fraged_pages;
2158                  i++, curpage++)
2159         {
2160                 Assert((*curpage)->blkno < blkno);
2161                 if ((*curpage)->blkno > last_move_dest_block)
2162                         break;                          /* no need to scan any further */
2163                 if ((*curpage)->offsets_used == 0)
2164                         continue;                       /* this page was never used as a move dest */
2165                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2166                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2167                 page = BufferGetPage(buf);
2168                 num_tuples = 0;
2169                 max_offset = PageGetMaxOffsetNumber(page);
2170                 for (newoff = FirstOffsetNumber;
2171                          newoff <= max_offset;
2172                          newoff = OffsetNumberNext(newoff))
2173                 {
2174                         itemid = PageGetItemId(page, newoff);
2175                         if (!ItemIdIsUsed(itemid))
2176                                 continue;
2177                         tuple.t_datamcxt = NULL;
2178                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2179                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2180                         {
2181                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
2182                                         elog(ERROR, "Invalid XID in t_cmin (2)");
2183                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2184                                 {
2185                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2186                                         num_tuples++;
2187                                 }
2188                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2189                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2190                                 else
2191                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2192                         }
2193                 }
2194                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2195                 WriteBuffer(buf);
2196                 Assert((*curpage)->offsets_used == num_tuples);
2197                 checked_moved += num_tuples;
2198         }
2199         Assert(num_moved == checked_moved);
2200
2201         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
2202                  RelationGetRelationName(onerel),
2203                  nblocks, blkno, num_moved,
2204                  vac_show_rusage(&ru0));
2205
2206         /*
2207          * Reflect the motion of system tuples to catalog cache here.
2208          */
2209         CommandCounterIncrement();
2210
2211         if (Nvacpagelist.num_pages > 0)
2212         {
2213                 /* vacuum indexes again if needed */
2214                 if (Irel != (Relation *) NULL)
2215                 {
2216                         VacPage    *vpleft,
2217                                            *vpright,
2218                                                 vpsave;
2219
2220                         /* re-sort Nvacpagelist.pagedesc */
2221                         for (vpleft = Nvacpagelist.pagedesc,
2222                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2223                                  vpleft < vpright; vpleft++, vpright--)
2224                         {
2225                                 vpsave = *vpleft;
2226                                 *vpleft = *vpright;
2227                                 *vpright = vpsave;
2228                         }
2229                         Assert(keep_tuples >= 0);
2230                         for (i = 0; i < nindexes; i++)
2231                                 vacuum_index(&Nvacpagelist, Irel[i],
2232                                                          vacrelstats->rel_tuples, keep_tuples);
2233                 }
2234
2235                 /* clean moved tuples from last page in Nvacpagelist list */
2236                 if (vacpage->blkno == (blkno - 1) &&
2237                         vacpage->offsets_free > 0)
2238                 {
2239                         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2240                         OffsetNumber *unused = unbuf;
2241                         int                     uncnt;
2242
2243                         buf = ReadBuffer(onerel, vacpage->blkno);
2244                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2245                         page = BufferGetPage(buf);
2246                         num_tuples = 0;
2247                         maxoff = PageGetMaxOffsetNumber(page);
2248                         for (offnum = FirstOffsetNumber;
2249                                  offnum <= maxoff;
2250                                  offnum = OffsetNumberNext(offnum))
2251                         {
2252                                 itemid = PageGetItemId(page, offnum);
2253                                 if (!ItemIdIsUsed(itemid))
2254                                         continue;
2255                                 tuple.t_datamcxt = NULL;
2256                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2257
2258                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2259                                 {
2260                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
2261                                                 elog(ERROR, "Invalid XID in t_cmin (3)");
2262                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2263                                         {
2264                                                 itemid->lp_flags &= ~LP_USED;
2265                                                 num_tuples++;
2266                                         }
2267                                         else
2268                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2269                                 }
2270
2271                         }
2272                         Assert(vacpage->offsets_free == num_tuples);
2273                         START_CRIT_SECTION();
2274                         uncnt = PageRepairFragmentation(page, unused);
2275                         {
2276                                 XLogRecPtr      recptr;
2277
2278                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
2279                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2280                                 PageSetLSN(page, recptr);
2281                                 PageSetSUI(page, ThisStartUpID);
2282                         }
2283                         END_CRIT_SECTION();
2284                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2285                         WriteBuffer(buf);
2286                 }
2287
2288                 /* now - free new list of reaped pages */
2289                 curpage = Nvacpagelist.pagedesc;
2290                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2291                         pfree(*curpage);
2292                 pfree(Nvacpagelist.pagedesc);
2293         }
2294
2295         /*
2296          * Flush dirty pages out to disk.  We do this unconditionally, even if
2297          * we don't need to truncate, because we want to ensure that all
2298          * tuples have correct on-row commit status on disk (see bufmgr.c's
2299          * comments for FlushRelationBuffers()).
2300          */
2301         i = FlushRelationBuffers(onerel, blkno);
2302         if (i < 0)
2303                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2304                          i);
2305
2306         /* truncate relation, if needed */
2307         if (blkno < nblocks)
2308         {
2309                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2310                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2311                 onerel->rd_targblock = InvalidBlockNumber;
2312                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2313         }
2314
2315         /* clean up */
2316         pfree(vacpage);
2317         if (vacrelstats->vtlinks != NULL)
2318                 pfree(vacrelstats->vtlinks);
2319
2320         ExecDropTupleTable(tupleTable, true);
2321
2322         ExecCloseIndices(resultRelInfo);
2323 }
2324
2325 /*
2326  *      vacuum_heap() -- free dead tuples
2327  *
2328  *              This routine marks dead tuples as unused and truncates relation
2329  *              if there are "empty" end-blocks.
2330  */
2331 static void
2332 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2333 {
2334         Buffer          buf;
2335         VacPage    *vacpage;
2336         BlockNumber relblocks;
2337         int                     nblocks;
2338         int                     i;
2339
2340         nblocks = vacuum_pages->num_pages;
2341         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2342
2343         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2344         {
2345                 if ((*vacpage)->offsets_free > 0)
2346                 {
2347                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2348                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2349                         vacuum_page(onerel, buf, *vacpage);
2350                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2351                         WriteBuffer(buf);
2352                 }
2353         }
2354
2355         /*
2356          * Flush dirty pages out to disk.  We do this unconditionally, even if
2357          * we don't need to truncate, because we want to ensure that all
2358          * tuples have correct on-row commit status on disk (see bufmgr.c's
2359          * comments for FlushRelationBuffers()).
2360          */
2361         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2362         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2363
2364         i = FlushRelationBuffers(onerel, relblocks);
2365         if (i < 0)
2366                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2367                          i);
2368
2369         /* truncate relation if there are some empty end-pages */
2370         if (vacuum_pages->empty_end_pages > 0)
2371         {
2372                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
2373                          RelationGetRelationName(onerel),
2374                          vacrelstats->rel_pages, relblocks);
2375                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2376                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2377                 onerel->rd_targblock = InvalidBlockNumber;
2378                 vacrelstats->rel_pages = relblocks;             /* set new number of
2379                                                                                                  * blocks */
2380         }
2381 }
2382
2383 /*
2384  *      vacuum_page() -- free dead tuples on a page
2385  *                                       and repair its fragmentation.
2386  */
2387 static void
2388 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2389 {
2390         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2391         OffsetNumber *unused = unbuf;
2392         int                     uncnt;
2393         Page            page = BufferGetPage(buffer);
2394         ItemId          itemid;
2395         int                     i;
2396
2397         /* There shouldn't be any tuples moved onto the page yet! */
2398         Assert(vacpage->offsets_used == 0);
2399
2400         START_CRIT_SECTION();
2401         for (i = 0; i < vacpage->offsets_free; i++)
2402         {
2403                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2404                 itemid->lp_flags &= ~LP_USED;
2405         }
2406         uncnt = PageRepairFragmentation(page, unused);
2407         {
2408                 XLogRecPtr      recptr;
2409
2410                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2411                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2412                 PageSetLSN(page, recptr);
2413                 PageSetSUI(page, ThisStartUpID);
2414         }
2415         END_CRIT_SECTION();
2416 }
2417
2418 /*
2419  *      scan_index() -- scan one index relation to update statistic.
2420  *
2421  * We use this when we have no deletions to do.
2422  */
2423 static void
2424 scan_index(Relation indrel, double num_tuples)
2425 {
2426         IndexBulkDeleteResult *stats;
2427         VacRUsage       ru0;
2428
2429         vac_init_rusage(&ru0);
2430
2431         /*
2432          * Even though we're not planning to delete anything, use the
2433          * ambulkdelete call, so that the scan happens within the index AM for
2434          * more speed.
2435          */
2436         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2437
2438         if (!stats)
2439                 return;
2440
2441         /* now update statistics in pg_class */
2442         vac_update_relstats(RelationGetRelid(indrel),
2443                                                 stats->num_pages, stats->num_index_tuples,
2444                                                 false);
2445
2446         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
2447                  RelationGetRelationName(indrel),
2448                  stats->num_pages, stats->num_index_tuples,
2449                  vac_show_rusage(&ru0));
2450
2451         /*
2452          * Check for tuple count mismatch.      If the index is partial, then it's
2453          * OK for it to have fewer tuples than the heap; else we got trouble.
2454          */
2455         if (stats->num_index_tuples != num_tuples)
2456         {
2457                 if (stats->num_index_tuples > num_tuples ||
2458                         !vac_is_partial_index(indrel))
2459                         elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2460 \n\tRecreate the index.",
2461                                  RelationGetRelationName(indrel),
2462                                  stats->num_index_tuples, num_tuples);
2463         }
2464
2465         pfree(stats);
2466 }
2467
2468 /*
2469  *      vacuum_index() -- vacuum one index relation.
2470  *
2471  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2472  *              It's locked. Indrel is an index relation on the vacuumed heap.
2473  *
2474  *              We don't bother to set locks on the index relation here, since
2475  *              the parent table is exclusive-locked already.
2476  *
2477  *              Finally, we arrange to update the index relation's statistics in
2478  *              pg_class.
2479  */
2480 static void
2481 vacuum_index(VacPageList vacpagelist, Relation indrel,
2482                          double num_tuples, int keep_tuples)
2483 {
2484         IndexBulkDeleteResult *stats;
2485         VacRUsage       ru0;
2486
2487         vac_init_rusage(&ru0);
2488
2489         /* Do bulk deletion */
2490         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2491
2492         if (!stats)
2493                 return;
2494
2495         /* now update statistics in pg_class */
2496         vac_update_relstats(RelationGetRelid(indrel),
2497                                                 stats->num_pages, stats->num_index_tuples,
2498                                                 false);
2499
2500         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
2501                  RelationGetRelationName(indrel), stats->num_pages,
2502                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2503                  vac_show_rusage(&ru0));
2504
2505         /*
2506          * Check for tuple count mismatch.      If the index is partial, then it's
2507          * OK for it to have fewer tuples than the heap; else we got trouble.
2508          */
2509         if (stats->num_index_tuples != num_tuples + keep_tuples)
2510         {
2511                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2512                         !vac_is_partial_index(indrel))
2513                         elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2514 \n\tRecreate the index.",
2515                                  RelationGetRelationName(indrel),
2516                                  stats->num_index_tuples, num_tuples);
2517         }
2518
2519         pfree(stats);
2520 }
2521
2522 /*
2523  *      tid_reaped() -- is a particular tid reaped?
2524  *
2525  *              This has the right signature to be an IndexBulkDeleteCallback.
2526  *
2527  *              vacpagelist->VacPage_array is sorted in right order.
2528  */
2529 static bool
2530 tid_reaped(ItemPointer itemptr, void *state)
2531 {
2532         VacPageList vacpagelist = (VacPageList) state;
2533         OffsetNumber ioffno;
2534         OffsetNumber *voff;
2535         VacPage         vp,
2536                            *vpp;
2537         VacPageData vacpage;
2538
2539         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2540         ioffno = ItemPointerGetOffsetNumber(itemptr);
2541
2542         vp = &vacpage;
2543         vpp = (VacPage *) vac_bsearch((void *) &vp,
2544                                                                   (void *) (vacpagelist->pagedesc),
2545                                                                   vacpagelist->num_pages,
2546                                                                   sizeof(VacPage),
2547                                                                   vac_cmp_blk);
2548
2549         if (vpp == NULL)
2550                 return false;
2551
2552         /* ok - we are on a partially or fully reaped page */
2553         vp = *vpp;
2554
2555         if (vp->offsets_free == 0)
2556         {
2557                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2558                 return true;
2559         }
2560
2561         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2562                                                                                 (void *) (vp->offsets),
2563                                                                                 vp->offsets_free,
2564                                                                                 sizeof(OffsetNumber),
2565                                                                                 vac_cmp_offno);
2566
2567         if (voff == NULL)
2568                 return false;
2569
2570         /* tid is reaped */
2571         return true;
2572 }
2573
2574 /*
2575  * Dummy version for scan_index.
2576  */
2577 static bool
2578 dummy_tid_reaped(ItemPointer itemptr, void *state)
2579 {
2580         return false;
2581 }
2582
2583 /*
2584  * Update the shared Free Space Map with the info we now have about
2585  * free space in the relation, discarding any old info the map may have.
2586  */
2587 static void
2588 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2589                            BlockNumber rel_pages)
2590 {
2591         int                     nPages = fraged_pages->num_pages;
2592         int                     i;
2593         BlockNumber *pages;
2594         Size       *spaceAvail;
2595
2596         /* +1 to avoid palloc(0) */
2597         pages = (BlockNumber *) palloc((nPages + 1) * sizeof(BlockNumber));
2598         spaceAvail = (Size *) palloc((nPages + 1) * sizeof(Size));
2599
2600         for (i = 0; i < nPages; i++)
2601         {
2602                 pages[i] = fraged_pages->pagedesc[i]->blkno;
2603                 spaceAvail[i] = fraged_pages->pagedesc[i]->free;
2604
2605                 /*
2606                  * fraged_pages may contain entries for pages that we later
2607                  * decided to truncate from the relation; don't enter them into
2608                  * the map!
2609                  */
2610                 if (pages[i] >= rel_pages)
2611                 {
2612                         nPages = i;
2613                         break;
2614                 }
2615         }
2616
2617         MultiRecordFreeSpace(&onerel->rd_node,
2618                                                  0, MaxBlockNumber,
2619                                                  nPages, pages, spaceAvail);
2620         pfree(pages);
2621         pfree(spaceAvail);
2622 }
2623
2624 /* Copy a VacPage structure */
2625 static VacPage
2626 copy_vac_page(VacPage vacpage)
2627 {
2628         VacPage         newvacpage;
2629
2630         /* allocate a VacPageData entry */
2631         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2632                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2633
2634         /* fill it in */
2635         if (vacpage->offsets_free > 0)
2636                 memcpy(newvacpage->offsets, vacpage->offsets,
2637                            vacpage->offsets_free * sizeof(OffsetNumber));
2638         newvacpage->blkno = vacpage->blkno;
2639         newvacpage->free = vacpage->free;
2640         newvacpage->offsets_used = vacpage->offsets_used;
2641         newvacpage->offsets_free = vacpage->offsets_free;
2642
2643         return newvacpage;
2644 }
2645
2646 /*
2647  * Add a VacPage pointer to a VacPageList.
2648  *
2649  *              As a side effect of the way that scan_heap works,
2650  *              higher pages come after lower pages in the array
2651  *              (and highest tid on a page is last).
2652  */
2653 static void
2654 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2655 {
2656 #define PG_NPAGEDESC 1024
2657
2658         /* allocate a VacPage entry if needed */
2659         if (vacpagelist->num_pages == 0)
2660         {
2661                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2662                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2663         }
2664         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2665         {
2666                 vacpagelist->num_allocated_pages *= 2;
2667                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2668         }
2669         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2670         (vacpagelist->num_pages)++;
2671 }
2672
2673 /*
2674  * vac_bsearch: just like standard C library routine bsearch(),
2675  * except that we first test to see whether the target key is outside
2676  * the range of the table entries.      This case is handled relatively slowly
2677  * by the normal binary search algorithm (ie, no faster than any other key)
2678  * but it occurs often enough in VACUUM to be worth optimizing.
2679  */
2680 static void *
2681 vac_bsearch(const void *key, const void *base,
2682                         size_t nelem, size_t size,
2683                         int (*compar) (const void *, const void *))
2684 {
2685         int                     res;
2686         const void *last;
2687
2688         if (nelem == 0)
2689                 return NULL;
2690         res = compar(key, base);
2691         if (res < 0)
2692                 return NULL;
2693         if (res == 0)
2694                 return (void *) base;
2695         if (nelem > 1)
2696         {
2697                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2698                 res = compar(key, last);
2699                 if (res > 0)
2700                         return NULL;
2701                 if (res == 0)
2702                         return (void *) last;
2703         }
2704         if (nelem <= 2)
2705                 return NULL;                    /* already checked 'em all */
2706         return bsearch(key, base, nelem, size, compar);
2707 }
2708
2709 /*
2710  * Comparator routines for use with qsort() and bsearch().
2711  */
2712 static int
2713 vac_cmp_blk(const void *left, const void *right)
2714 {
2715         BlockNumber lblk,
2716                                 rblk;
2717
2718         lblk = (*((VacPage *) left))->blkno;
2719         rblk = (*((VacPage *) right))->blkno;
2720
2721         if (lblk < rblk)
2722                 return -1;
2723         if (lblk == rblk)
2724                 return 0;
2725         return 1;
2726 }
2727
2728 static int
2729 vac_cmp_offno(const void *left, const void *right)
2730 {
2731         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2732                 return -1;
2733         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2734                 return 0;
2735         return 1;
2736 }
2737
2738 static int
2739 vac_cmp_vtlinks(const void *left, const void *right)
2740 {
2741         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2742                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2743                 return -1;
2744         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2745                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2746                 return 1;
2747         /* bi_hi-es are equal */
2748         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2749                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2750                 return -1;
2751         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2752                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2753                 return 1;
2754         /* bi_lo-es are equal */
2755         if (((VTupleLink) left)->new_tid.ip_posid <
2756                 ((VTupleLink) right)->new_tid.ip_posid)
2757                 return -1;
2758         if (((VTupleLink) left)->new_tid.ip_posid >
2759                 ((VTupleLink) right)->new_tid.ip_posid)
2760                 return 1;
2761         return 0;
2762 }
2763
2764
2765 void
2766 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2767 {
2768         List       *indexoidlist,
2769                            *indexoidscan;
2770         int                     i;
2771
2772         indexoidlist = RelationGetIndexList(relation);
2773
2774         *nindexes = length(indexoidlist);
2775
2776         if (*nindexes > 0)
2777                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2778         else
2779                 *Irel = NULL;
2780
2781         i = 0;
2782         foreach(indexoidscan, indexoidlist)
2783         {
2784                 Oid                     indexoid = lfirsti(indexoidscan);
2785
2786                 (*Irel)[i] = index_open(indexoid);
2787                 i++;
2788         }
2789
2790         freeList(indexoidlist);
2791 }
2792
2793
2794 void
2795 vac_close_indexes(int nindexes, Relation *Irel)
2796 {
2797         if (Irel == (Relation *) NULL)
2798                 return;
2799
2800         while (nindexes--)
2801                 index_close(Irel[nindexes]);
2802         pfree(Irel);
2803 }
2804
2805
2806 /*
2807  * Is an index partial (ie, could it contain fewer tuples than the heap?)
2808  */
2809 bool
2810 vac_is_partial_index(Relation indrel)
2811 {
2812         bool            result;
2813         HeapTuple       cachetuple;
2814         Form_pg_index indexStruct;
2815
2816         /*
2817          * If the index's AM doesn't support nulls, it's partial for our
2818          * purposes
2819          */
2820         if (!indrel->rd_am->amindexnulls)
2821                 return true;
2822
2823         /* Otherwise, look to see if there's a partial-index predicate */
2824         cachetuple = SearchSysCache(INDEXRELID,
2825                                                           ObjectIdGetDatum(RelationGetRelid(indrel)),
2826                                                                 0, 0, 0);
2827         if (!HeapTupleIsValid(cachetuple))
2828                 elog(ERROR, "vac_is_partial_index: index %u not found",
2829                          RelationGetRelid(indrel));
2830         indexStruct = (Form_pg_index) GETSTRUCT(cachetuple);
2831
2832         result = (VARSIZE(&indexStruct->indpred) > VARHDRSZ);
2833
2834         ReleaseSysCache(cachetuple);
2835         return result;
2836 }
2837
2838
2839 static bool
2840 enough_space(VacPage vacpage, Size len)
2841 {
2842         len = MAXALIGN(len);
2843
2844         if (len > vacpage->free)
2845                 return false;
2846
2847         /* if there are free itemid(s) and len <= free_space... */
2848         if (vacpage->offsets_used < vacpage->offsets_free)
2849                 return true;
2850
2851         /* noff_used >= noff_free and so we'll have to allocate new itemid */
2852         if (len + sizeof(ItemIdData) <= vacpage->free)
2853                 return true;
2854
2855         return false;
2856 }
2857
2858
2859 /*
2860  * Initialize usage snapshot.
2861  */
2862 void
2863 vac_init_rusage(VacRUsage *ru0)
2864 {
2865         struct timezone tz;
2866
2867         getrusage(RUSAGE_SELF, &ru0->ru);
2868         gettimeofday(&ru0->tv, &tz);
2869 }
2870
2871 /*
2872  * Compute elapsed time since ru0 usage snapshot, and format into
2873  * a displayable string.  Result is in a static string, which is
2874  * tacky, but no one ever claimed that the Postgres backend is
2875  * threadable...
2876  */
2877 const char *
2878 vac_show_rusage(VacRUsage *ru0)
2879 {
2880         static char result[100];
2881         VacRUsage       ru1;
2882
2883         vac_init_rusage(&ru1);
2884
2885         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
2886         {
2887                 ru1.tv.tv_sec--;
2888                 ru1.tv.tv_usec += 1000000;
2889         }
2890         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
2891         {
2892                 ru1.ru.ru_stime.tv_sec--;
2893                 ru1.ru.ru_stime.tv_usec += 1000000;
2894         }
2895         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
2896         {
2897                 ru1.ru.ru_utime.tv_sec--;
2898                 ru1.ru.ru_utime.tv_usec += 1000000;
2899         }
2900
2901         snprintf(result, sizeof(result),
2902                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
2903                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
2904           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
2905                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
2906           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
2907                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
2908                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
2909
2910         return result;
2911 }