]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Make OIDs optional, per discussions in pghackers. WITH OIDS is still the
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.207 2001/08/10 18:57:35 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/genam.h"
25 #include "access/heapam.h"
26 #include "access/xlog.h"
27 #include "catalog/catalog.h"
28 #include "catalog/catname.h"
29 #include "catalog/pg_index.h"
30 #include "commands/vacuum.h"
31 #include "executor/executor.h"
32 #include "miscadmin.h"
33 #include "storage/freespace.h"
34 #include "storage/sinval.h"
35 #include "storage/smgr.h"
36 #include "tcop/pquery.h"
37 #include "utils/acl.h"
38 #include "utils/builtins.h"
39 #include "utils/fmgroids.h"
40 #include "utils/inval.h"
41 #include "utils/relcache.h"
42 #include "utils/syscache.h"
43 #include "utils/temprel.h"
44
45 #include "pgstat.h"
46
47
48 typedef struct VRelListData
49 {
50         Oid                     vrl_relid;
51         struct VRelListData *vrl_next;
52 } VRelListData;
53
54 typedef VRelListData *VRelList;
55
56 typedef struct VacPageData
57 {
58         BlockNumber blkno;                      /* BlockNumber of this Page */
59         Size            free;                   /* FreeSpace on this Page */
60         uint16          offsets_used;   /* Number of OffNums used by vacuum */
61         uint16          offsets_free;   /* Number of OffNums free or to be free */
62         OffsetNumber offsets[1];        /* Array of free OffNums */
63 } VacPageData;
64
65 typedef VacPageData *VacPage;
66
67 typedef struct VacPageListData
68 {
69         BlockNumber     empty_end_pages;        /* Number of "empty" end-pages */
70         int                     num_pages;                      /* Number of pages in pagedesc */
71         int                     num_allocated_pages;    /* Number of allocated pages in
72                                                                                  * pagedesc */
73         VacPage    *pagedesc;                   /* Descriptions of pages */
74 } VacPageListData;
75
76 typedef VacPageListData *VacPageList;
77
78 typedef struct VTupleLinkData
79 {
80         ItemPointerData new_tid;
81         ItemPointerData this_tid;
82 } VTupleLinkData;
83
84 typedef VTupleLinkData *VTupleLink;
85
86 typedef struct VTupleMoveData
87 {
88         ItemPointerData tid;            /* tuple ID */
89         VacPage         vacpage;                /* where to move */
90         bool            cleanVpd;               /* clean vacpage before using */
91 } VTupleMoveData;
92
93 typedef VTupleMoveData *VTupleMove;
94
95 typedef struct VRelStats
96 {
97         BlockNumber     rel_pages;
98         double          rel_tuples;
99         Size            min_tlen;
100         Size            max_tlen;
101         bool            hasindex;
102         int                     num_vtlinks;
103         VTupleLink      vtlinks;
104 } VRelStats;
105
106
107 static MemoryContext vac_context = NULL;
108
109 static int      MESSAGE_LEVEL;          /* message level */
110
111 static TransactionId XmaxRecent;
112
113
114 /* non-export function prototypes */
115 static void vacuum_init(void);
116 static void vacuum_shutdown(void);
117 static VRelList getrels(Name VacRelP, const char *stmttype);
118 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt);
119 static void full_vacuum_rel(Relation onerel);
120 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
121                                           VacPageList vacuum_pages, VacPageList fraged_pages);
122 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
123                                                 VacPageList vacuum_pages, VacPageList fraged_pages,
124                                                 int nindexes, Relation *Irel);
125 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
126                                                 VacPageList vacpagelist);
127 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
128 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
129                                                  double num_tuples, int keep_tuples);
130 static void scan_index(Relation indrel, double num_tuples);
131 static bool tid_reaped(ItemPointer itemptr, void *state);
132 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
133 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
134                                                    BlockNumber rel_pages);
135 static VacPage copy_vac_page(VacPage vacpage);
136 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
137 static void *vac_bsearch(const void *key, const void *base,
138                                                  size_t nelem, size_t size,
139                                                  int (*compar) (const void *, const void *));
140 static int      vac_cmp_blk(const void *left, const void *right);
141 static int      vac_cmp_offno(const void *left, const void *right);
142 static int      vac_cmp_vtlinks(const void *left, const void *right);
143 static bool enough_space(VacPage vacpage, Size len);
144
145
146 /****************************************************************************
147  *                                                                                                                                                      *
148  *                      Code common to all flavors of VACUUM and ANALYZE                                *
149  *                                                                                                                                                      *
150  ****************************************************************************
151  */
152
153
154 /*
155  * Primary entry point for VACUUM and ANALYZE commands.
156  */
157 void
158 vacuum(VacuumStmt *vacstmt)
159 {
160         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
161         NameData        VacRel;
162         Name            VacRelName;
163         VRelList        vrl,
164                                 cur;
165
166         /*
167          * We cannot run VACUUM inside a user transaction block; if we were
168          * inside a transaction, then our commit- and
169          * start-transaction-command calls would not have the intended effect!
170          * Furthermore, the forced commit that occurs before truncating the
171          * relation's file would have the effect of committing the rest of the
172          * user's transaction too, which would certainly not be the desired
173          * behavior.
174          */
175         if (IsTransactionBlock())
176                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
177
178         /*
179          * Send info about dead objects to the statistics collector
180          */
181         pgstat_vacuum_tabstat();
182
183         if (vacstmt->verbose)
184                 MESSAGE_LEVEL = NOTICE;
185         else
186                 MESSAGE_LEVEL = DEBUG;
187
188         /*
189          * Create special memory context for cross-transaction storage.
190          *
191          * Since it is a child of QueryContext, it will go away eventually even
192          * if we suffer an error; there's no need for special abort cleanup
193          * logic.
194          */
195         vac_context = AllocSetContextCreate(QueryContext,
196                                                                                 "Vacuum",
197                                                                                 ALLOCSET_DEFAULT_MINSIZE,
198                                                                                 ALLOCSET_DEFAULT_INITSIZE,
199                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
200
201         /* Convert vacrel, which is just a string, to a Name */
202         if (vacstmt->vacrel)
203         {
204                 namestrcpy(&VacRel, vacstmt->vacrel);
205                 VacRelName = &VacRel;
206         }
207         else
208                 VacRelName = NULL;
209
210         /* Build list of relations to process (note this lives in vac_context) */
211         vrl = getrels(VacRelName, stmttype);
212
213         /*
214          * Start up the vacuum cleaner.
215          */
216         vacuum_init();
217
218         /*
219          * Process each selected relation.  We are careful to process
220          * each relation in a separate transaction in order to avoid holding
221          * too many locks at one time.  Also, if we are doing VACUUM ANALYZE,
222          * the ANALYZE part runs as a separate transaction from the VACUUM
223          * to further reduce locking.
224          */
225         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
226         {
227                 if (vacstmt->vacuum)
228                         vacuum_rel(cur->vrl_relid, vacstmt);
229                 if (vacstmt->analyze)
230                         analyze_rel(cur->vrl_relid, vacstmt);
231         }
232
233         /*
234          * If we did a complete vacuum, then flush the init file that relcache.c
235          * uses to save startup time. The next backend startup will rebuild the
236          * init file with up-to-date information from pg_class.  This lets the
237          * optimizer see the stats that we've collected for certain critical
238          * system indexes.  See relcache.c for more details.
239          *
240          * Ignore any failure to unlink the file, since it might not be there if
241          * no backend has been started since the last vacuum.
242          */
243         if (vacstmt->vacrel == NULL)
244                 unlink(RELCACHE_INIT_FILENAME);
245
246         /* clean up */
247         vacuum_shutdown();
248 }
249
250 /*
251  *      vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
252  *
253  *              Formerly, there was code here to prevent more than one VACUUM from
254  *              executing concurrently in the same database.  However, there's no
255  *              good reason to prevent that, and manually removing lockfiles after
256  *              a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
257  *              and just rely on the locks we grab on each target table
258  *              to ensure that there aren't two VACUUMs running on the same table
259  *              at the same time.
260  *
261  *              The strangeness with committing and starting transactions in the
262  *              init and shutdown routines is due to the fact that the vacuum cleaner
263  *              is invoked via an SQL command, and so is already executing inside
264  *              a transaction.  We need to leave ourselves in a predictable state
265  *              on entry and exit to the vacuum cleaner.  We commit the transaction
266  *              started in PostgresMain() inside vacuum_init(), and start one in
267  *              vacuum_shutdown() to match the commit waiting for us back in
268  *              PostgresMain().
269  */
270 static void
271 vacuum_init(void)
272 {
273         /* matches the StartTransaction in PostgresMain() */
274         CommitTransactionCommand();
275 }
276
277 static void
278 vacuum_shutdown(void)
279 {
280         /* on entry, we are not in a transaction */
281
282         /* matches the CommitTransaction in PostgresMain() */
283         StartTransactionCommand();
284
285         /*
286          * Clean up working storage --- note we must do this after
287          * StartTransactionCommand, else we might be trying to delete the
288          * active context!
289          */
290         MemoryContextDelete(vac_context);
291         vac_context = NULL;
292 }
293
294 /*
295  * Build a list of VRelListData nodes for each relation to be processed
296  *
297  * The list is built in vac_context so that it will survive across our
298  * per-relation transactions.
299  */
300 static VRelList
301 getrels(Name VacRelP, const char *stmttype)
302 {
303         Relation        rel;
304         TupleDesc       tupdesc;
305         HeapScanDesc scan;
306         HeapTuple       tuple;
307         VRelList        vrl,
308                                 cur;
309         Datum           d;
310         char       *rname;
311         char            rkind;
312         bool            n;
313         ScanKeyData key;
314
315         if (VacRelP)
316         {
317                 /*
318                  * we could use the cache here, but it is clearer to use scankeys
319                  * for both vacuum cases, bjm 2000/01/19
320                  */
321                 char       *nontemp_relname;
322
323                 /* We must re-map temp table names bjm 2000-04-06 */
324                 nontemp_relname = get_temp_rel_by_username(NameStr(*VacRelP));
325                 if (nontemp_relname == NULL)
326                         nontemp_relname = NameStr(*VacRelP);
327
328                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
329                                                            F_NAMEEQ,
330                                                            PointerGetDatum(nontemp_relname));
331         }
332         else
333         {
334                 /* find all plain relations listed in pg_class */
335                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
336                                                            F_CHAREQ, CharGetDatum(RELKIND_RELATION));
337         }
338
339         vrl = cur = (VRelList) NULL;
340
341         rel = heap_openr(RelationRelationName, AccessShareLock);
342         tupdesc = RelationGetDescr(rel);
343
344         scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
345
346         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
347         {
348                 d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
349                 rname = (char *) DatumGetName(d);
350
351                 d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
352                 rkind = DatumGetChar(d);
353
354                 if (rkind != RELKIND_RELATION)
355                 {
356                         elog(NOTICE, "%s: can not process indexes, views or special system tables",
357                                  stmttype);
358                         continue;
359                 }
360
361                 /* Make a relation list entry for this guy */
362                 if (vrl == (VRelList) NULL)
363                         vrl = cur = (VRelList)
364                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
365                 else
366                 {
367                         cur->vrl_next = (VRelList)
368                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
369                         cur = cur->vrl_next;
370                 }
371
372                 cur->vrl_relid = tuple->t_data->t_oid;
373                 cur->vrl_next = (VRelList) NULL;
374         }
375
376         heap_endscan(scan);
377         heap_close(rel, AccessShareLock);
378
379         if (vrl == NULL)
380                 elog(NOTICE, "%s: table not found", stmttype);
381
382         return vrl;
383 }
384
385
386 /*
387  *      vac_update_relstats() -- update statistics for one relation
388  *
389  *              Update the whole-relation statistics that are kept in its pg_class
390  *              row.  There are additional stats that will be updated if we are
391  *              doing ANALYZE, but we always update these stats.  This routine works
392  *              for both index and heap relation entries in pg_class.
393  *
394  *              We violate no-overwrite semantics here by storing new values for the
395  *              statistics columns directly into the pg_class tuple that's already on
396  *              the page.  The reason for this is that if we updated these tuples in
397  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
398  *              by the time we got done with a vacuum cycle, most of the tuples in
399  *              pg_class would've been obsoleted.  Of course, this only works for
400  *              fixed-size never-null columns, but these are.
401  *
402  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
403  *              ANALYZE.
404  */
405 void
406 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
407                                         bool hasindex)
408 {
409         Relation        rd;
410         HeapTupleData rtup;
411         HeapTuple       ctup;
412         Form_pg_class pgcform;
413         Buffer          buffer;
414
415         /*
416          * update number of tuples and number of pages in pg_class
417          */
418         rd = heap_openr(RelationRelationName, RowExclusiveLock);
419
420         ctup = SearchSysCache(RELOID,
421                                                   ObjectIdGetDatum(relid),
422                                                   0, 0, 0);
423         if (!HeapTupleIsValid(ctup))
424                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
425                          relid);
426
427         /* get the buffer cache tuple */
428         rtup.t_self = ctup->t_self;
429         ReleaseSysCache(ctup);
430         heap_fetch(rd, SnapshotNow, &rtup, &buffer, NULL);
431
432         /* overwrite the existing statistics in the tuple */
433         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
434         pgcform->relpages = (int32) num_pages;
435         pgcform->reltuples = num_tuples;
436         pgcform->relhasindex = hasindex;
437         /*
438          * If we have discovered that there are no indexes, then there's
439          * no primary key either.  This could be done more thoroughly...
440          */
441         if (!hasindex)
442                 pgcform->relhaspkey = false;
443
444         /* invalidate the tuple in the cache and write the buffer */
445         RelationInvalidateHeapTuple(rd, &rtup);
446         WriteBuffer(buffer);
447
448         heap_close(rd, RowExclusiveLock);
449 }
450
451
452 /****************************************************************************
453  *                                                                                                                                                      *
454  *                      Code common to both flavors of VACUUM                                                   *
455  *                                                                                                                                                      *
456  ****************************************************************************
457  */
458
459
460 /*
461  *      vacuum_rel() -- vacuum one heap relation
462  *
463  *              Doing one heap at a time incurs extra overhead, since we need to
464  *              check that the heap exists again just before we vacuum it.      The
465  *              reason that we do this is so that vacuuming can be spread across
466  *              many small transactions.  Otherwise, two-phase locking would require
467  *              us to lock the entire database during one pass of the vacuum cleaner.
468  *
469  *              At entry and exit, we are not inside a transaction.
470  */
471 static void
472 vacuum_rel(Oid relid, VacuumStmt *vacstmt)
473 {
474         LOCKMODE        lmode;
475         Relation        onerel;
476         LockRelId       onerelid;
477         Oid                     toast_relid;
478
479         /* Begin a transaction for vacuuming this relation */
480         StartTransactionCommand();
481
482         /*
483          * Check for user-requested abort.      Note we want this to be inside a
484          * transaction, so xact.c doesn't issue useless NOTICE.
485          */
486         CHECK_FOR_INTERRUPTS();
487
488         /*
489          * Race condition -- if the pg_class tuple has gone away since the
490          * last time we saw it, we don't need to vacuum it.
491          */
492         if (!SearchSysCacheExists(RELOID,
493                                                           ObjectIdGetDatum(relid),
494                                                           0, 0, 0))
495         {
496                 CommitTransactionCommand();
497                 return;
498         }
499
500         /*
501          * Determine the type of lock we want --- hard exclusive lock for a
502          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
503          * vacuum.  Either way, we can be sure that no other backend is vacuuming
504          * the same table.
505          */
506         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
507
508         /*
509          * Open the class, get an appropriate lock on it, and check permissions.
510          *
511          * We allow the user to vacuum a table if he is superuser, the table
512          * owner, or the database owner (but in the latter case, only if it's
513          * not a shared relation).  pg_ownercheck includes the superuser case.
514          *
515          * Note we choose to treat permissions failure as a NOTICE and keep
516          * trying to vacuum the rest of the DB --- is this appropriate?
517          */
518         onerel = heap_open(relid, lmode);
519
520         if (! (pg_ownercheck(GetUserId(), RelationGetRelationName(onerel),
521                                                  RELNAME) ||
522                    (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
523         {
524                 elog(NOTICE, "Skipping \"%s\" --- only table or database owner can VACUUM it",
525                          RelationGetRelationName(onerel));
526                 heap_close(onerel, lmode);
527                 CommitTransactionCommand();
528                 return;
529         }
530
531         /*
532          * Get a session-level lock too. This will protect our access to the
533          * relation across multiple transactions, so that we can vacuum the
534          * relation's TOAST table (if any) secure in the knowledge that no one
535          * is deleting the parent relation.
536          *
537          * NOTE: this cannot block, even if someone else is waiting for access,
538          * because the lock manager knows that both lock requests are from the
539          * same process.
540          */
541         onerelid = onerel->rd_lockInfo.lockRelId;
542         LockRelationForSession(&onerelid, lmode);
543
544         /*
545          * Remember the relation's TOAST relation for later
546          */
547         toast_relid = onerel->rd_rel->reltoastrelid;
548
549         /*
550          * Do the actual work --- either FULL or "lazy" vacuum
551          */
552         if (vacstmt->full)
553                 full_vacuum_rel(onerel);
554         else
555                 lazy_vacuum_rel(onerel, vacstmt);
556
557         /* all done with this class, but hold lock until commit */
558         heap_close(onerel, NoLock);
559
560         /*
561          * Complete the transaction and free all temporary memory used.
562          */
563         CommitTransactionCommand();
564
565         /*
566          * If the relation has a secondary toast rel, vacuum that too while we
567          * still hold the session lock on the master table.  Note however that
568          * "analyze" will not get done on the toast table.  This is good,
569          * because the toaster always uses hardcoded index access and statistics
570          * are totally unimportant for toast relations.
571          */
572         if (toast_relid != InvalidOid)
573                 vacuum_rel(toast_relid, vacstmt);
574
575         /*
576          * Now release the session-level lock on the master table.
577          */
578         UnlockRelationForSession(&onerelid, lmode);
579 }
580
581
582 /****************************************************************************
583  *                                                                                                                                                      *
584  *                      Code for VACUUM FULL (only)                                                                             *
585  *                                                                                                                                                      *
586  ****************************************************************************
587  */
588
589
590 /*
591  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
592  *
593  *              This routine vacuums a single heap, cleans out its indexes, and
594  *              updates its num_pages and num_tuples statistics.
595  *
596  *              At entry, we have already established a transaction and opened
597  *              and locked the relation.
598  */
599 static void
600 full_vacuum_rel(Relation onerel)
601 {
602         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
603                                                                                  * clean indexes */
604         VacPageListData fraged_pages;           /* List of pages with space enough
605                                                                                  * for re-using */
606         Relation   *Irel;
607         int                     nindexes,
608                                 i;
609         VRelStats  *vacrelstats;
610         bool            reindex = false;
611
612         if (IsIgnoringSystemIndexes() &&
613                 IsSystemRelationName(RelationGetRelationName(onerel)))
614                 reindex = true;
615
616         GetXmaxRecent(&XmaxRecent);
617
618         /*
619          * Set up statistics-gathering machinery.
620          */
621         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
622         vacrelstats->rel_pages = 0;
623         vacrelstats->rel_tuples = 0;
624         vacrelstats->hasindex = false;
625
626         /* scan the heap */
627         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
628         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
629
630         /* Now open all indexes of the relation */
631         vac_open_indexes(onerel, &nindexes, &Irel);
632         if (!Irel)
633                 reindex = false;
634         else if (!RelationGetForm(onerel)->relhasindex)
635                 reindex = true;
636         if (nindexes > 0)
637                 vacrelstats->hasindex = true;
638
639 #ifdef NOT_USED
640         /*
641          * reindex in VACUUM is dangerous under WAL. ifdef out until it
642          * becomes safe.
643          */
644         if (reindex)
645         {
646                 vac_close_indexes(nindexes, Irel);
647                 Irel = (Relation *) NULL;
648                 activate_indexes_of_a_table(RelationGetRelid(onerel), false);
649         }
650 #endif   /* NOT_USED */
651
652         /* Clean/scan index relation(s) */
653         if (Irel != (Relation *) NULL)
654         {
655                 if (vacuum_pages.num_pages > 0)
656                 {
657                         for (i = 0; i < nindexes; i++)
658                                 vacuum_index(&vacuum_pages, Irel[i],
659                                                          vacrelstats->rel_tuples, 0);
660                 }
661                 else
662                 {
663                         /* just scan indexes to update statistic */
664                         for (i = 0; i < nindexes; i++)
665                                 scan_index(Irel[i], vacrelstats->rel_tuples);
666                 }
667         }
668
669         if (fraged_pages.num_pages > 0)
670         {
671                 /* Try to shrink heap */
672                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
673                                         nindexes, Irel);
674                 vac_close_indexes(nindexes, Irel);
675         }
676         else
677         {
678                 vac_close_indexes(nindexes, Irel);
679                 if (vacuum_pages.num_pages > 0)
680                 {
681                         /* Clean pages from vacuum_pages list */
682                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
683                 }
684                 else
685                 {
686                         /*
687                          * Flush dirty pages out to disk.  We must do this even if we
688                          * didn't do anything else, because we want to ensure that all
689                          * tuples have correct on-row commit status on disk (see
690                          * bufmgr.c's comments for FlushRelationBuffers()).
691                          */
692                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
693                         if (i < 0)
694                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
695                                          i);
696                 }
697         }
698
699 #ifdef NOT_USED
700         if (reindex)
701                 activate_indexes_of_a_table(RelationGetRelid(onerel), true);
702 #endif   /* NOT_USED */
703
704         /* update shared free space map with final free space info */
705         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
706
707         /* update statistics in pg_class */
708         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
709                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
710 }
711
712
713 /*
714  *      scan_heap() -- scan an open heap relation
715  *
716  *              This routine sets commit status bits, constructs vacuum_pages (list
717  *              of pages we need to compact free space on and/or clean indexes of
718  *              deleted tuples), constructs fraged_pages (list of pages with free
719  *              space that tuples could be moved into), and calculates statistics
720  *              on the number of live tuples in the heap.
721  */
722 static void
723 scan_heap(VRelStats *vacrelstats, Relation onerel,
724                   VacPageList vacuum_pages, VacPageList fraged_pages)
725 {
726         BlockNumber nblocks,
727                                 blkno;
728         ItemId          itemid;
729         Buffer          buf;
730         HeapTupleData tuple;
731         OffsetNumber offnum,
732                                 maxoff;
733         bool            pgchanged,
734                                 tupgone,
735                                 notup;
736         char       *relname;
737         VacPage         vacpage,
738                                 vacpagecopy;
739         BlockNumber     empty_pages,
740                                 new_pages,
741                                 changed_pages,
742                                 empty_end_pages;
743         double          num_tuples,
744                                 tups_vacuumed,
745                                 nkeep,
746                                 nunused;
747         double          free_size,
748                                 usable_free_size;
749         Size            min_tlen = MaxTupleSize;
750         Size            max_tlen = 0;
751         int                     i;
752         bool            do_shrinking = true;
753         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
754         int                     num_vtlinks = 0;
755         int                     free_vtlinks = 100;
756         VacRUsage       ru0;
757
758         vac_init_rusage(&ru0);
759
760         relname = RelationGetRelationName(onerel);
761         elog(MESSAGE_LEVEL, "--Relation %s--", relname);
762
763         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
764         num_tuples = tups_vacuumed = nkeep = nunused = 0;
765         free_size = 0;
766
767         nblocks = RelationGetNumberOfBlocks(onerel);
768
769         /*
770          * We initially create each VacPage item in a maximal-sized workspace,
771          * then copy the workspace into a just-large-enough copy.
772          */
773         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
774
775         for (blkno = 0; blkno < nblocks; blkno++)
776         {
777                 Page            page,
778                                         tempPage = NULL;
779                 bool            do_reap,
780                                         do_frag;
781
782                 buf = ReadBuffer(onerel, blkno);
783                 page = BufferGetPage(buf);
784
785                 vacpage->blkno = blkno;
786                 vacpage->offsets_used = 0;
787                 vacpage->offsets_free = 0;
788
789                 if (PageIsNew(page))
790                 {
791                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
792                                  relname, blkno);
793                         PageInit(page, BufferGetPageSize(buf), 0);
794                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
795                         free_size += (vacpage->free - sizeof(ItemIdData));
796                         new_pages++;
797                         empty_end_pages++;
798                         vacpagecopy = copy_vac_page(vacpage);
799                         vpage_insert(vacuum_pages, vacpagecopy);
800                         vpage_insert(fraged_pages, vacpagecopy);
801                         WriteBuffer(buf);
802                         continue;
803                 }
804
805                 if (PageIsEmpty(page))
806                 {
807                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
808                         free_size += (vacpage->free - sizeof(ItemIdData));
809                         empty_pages++;
810                         empty_end_pages++;
811                         vacpagecopy = copy_vac_page(vacpage);
812                         vpage_insert(vacuum_pages, vacpagecopy);
813                         vpage_insert(fraged_pages, vacpagecopy);
814                         ReleaseBuffer(buf);
815                         continue;
816                 }
817
818                 pgchanged = false;
819                 notup = true;
820                 maxoff = PageGetMaxOffsetNumber(page);
821                 for (offnum = FirstOffsetNumber;
822                          offnum <= maxoff;
823                          offnum = OffsetNumberNext(offnum))
824                 {
825                         uint16          sv_infomask;
826
827                         itemid = PageGetItemId(page, offnum);
828
829                         /*
830                          * Collect un-used items too - it's possible to have indexes
831                          * pointing here after crash.
832                          */
833                         if (!ItemIdIsUsed(itemid))
834                         {
835                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
836                                 nunused += 1;
837                                 continue;
838                         }
839
840                         tuple.t_datamcxt = NULL;
841                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
842                         tuple.t_len = ItemIdGetLength(itemid);
843                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
844
845                         tupgone = false;
846                         sv_infomask = tuple.t_data->t_infomask;
847
848                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, XmaxRecent))
849                         {
850                                 case HEAPTUPLE_DEAD:
851                                         tupgone = true; /* we can delete the tuple */
852                                         break;
853                                 case HEAPTUPLE_LIVE:
854                                         break;
855                                 case HEAPTUPLE_RECENTLY_DEAD:
856                                         /*
857                                          * If tuple is recently deleted then we must not remove
858                                          * it from relation.
859                                          */
860                                         nkeep += 1;
861                                         /*
862                                          * If we do shrinking and this tuple is updated one
863                                          * then remember it to construct updated tuple
864                                          * dependencies.
865                                          */
866                                         if (do_shrinking &&
867                                                 !(ItemPointerEquals(&(tuple.t_self),
868                                                                                         &(tuple.t_data->t_ctid))))
869                                         {
870                                                 if (free_vtlinks == 0)
871                                                 {
872                                                         free_vtlinks = 1000;
873                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
874                                                                                    (free_vtlinks + num_vtlinks) *
875                                                                                                  sizeof(VTupleLinkData));
876                                                 }
877                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
878                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
879                                                 free_vtlinks--;
880                                                 num_vtlinks++;
881                                         }
882                                         break;
883                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
884                                         /*
885                                          * This should not happen, since we hold exclusive lock
886                                          * on the relation; shouldn't we raise an error?
887                                          */
888                                         elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
889                                                  relname, blkno, offnum, tuple.t_data->t_xmin);
890                                         do_shrinking = false;
891                                         break;
892                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
893                                         /*
894                                          * This should not happen, since we hold exclusive lock
895                                          * on the relation; shouldn't we raise an error?
896                                          */
897                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
898                                                  relname, blkno, offnum, tuple.t_data->t_xmax);
899                                         do_shrinking = false;
900                                         break;
901                                 default:
902                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
903                                         break;
904                         }
905
906                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
907                         if (sv_infomask != tuple.t_data->t_infomask)
908                                 pgchanged = true;
909
910                         /*
911                          * Other checks...
912                          */
913                         if (!OidIsValid(tuple.t_data->t_oid) &&
914                                 onerel->rd_rel->relhasoids)
915                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
916                                          relname, blkno, offnum, (int) tupgone);
917
918                         if (tupgone)
919                         {
920                                 ItemId          lpp;
921
922                                 /*
923                                  * Here we are building a temporary copy of the page with
924                                  * dead tuples removed.  Below we will apply
925                                  * PageRepairFragmentation to the copy, so that we can
926                                  * determine how much space will be available after
927                                  * removal of dead tuples.      But note we are NOT changing
928                                  * the real page yet...
929                                  */
930                                 if (tempPage == (Page) NULL)
931                                 {
932                                         Size            pageSize;
933
934                                         pageSize = PageGetPageSize(page);
935                                         tempPage = (Page) palloc(pageSize);
936                                         memcpy(tempPage, page, pageSize);
937                                 }
938
939                                 /* mark it unused on the temp page */
940                                 lpp = PageGetItemId(tempPage, offnum);
941                                 lpp->lp_flags &= ~LP_USED;
942
943                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
944                                 tups_vacuumed += 1;
945                         }
946                         else
947                         {
948                                 num_tuples += 1;
949                                 notup = false;
950                                 if (tuple.t_len < min_tlen)
951                                         min_tlen = tuple.t_len;
952                                 if (tuple.t_len > max_tlen)
953                                         max_tlen = tuple.t_len;
954                         }
955                 } /* scan along page */
956
957                 if (tempPage != (Page) NULL)
958                 {
959                         /* Some tuples are removable; figure free space after removal */
960                         PageRepairFragmentation(tempPage, NULL);
961                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
962                         pfree(tempPage);
963                         do_reap = true;
964                 }
965                 else
966                 {
967                         /* Just use current available space */
968                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
969                         /* Need to reap the page if it has ~LP_USED line pointers */
970                         do_reap = (vacpage->offsets_free > 0);
971                 }
972
973                 free_size += vacpage->free;
974                 /*
975                  * Add the page to fraged_pages if it has a useful amount of free
976                  * space.  "Useful" means enough for a minimal-sized tuple.
977                  * But we don't know that accurately near the start of the relation,
978                  * so add pages unconditionally if they have >= BLCKSZ/10 free space.
979                  */
980                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ/10);
981
982                 if (do_reap || do_frag)
983                 {
984                         vacpagecopy = copy_vac_page(vacpage);
985                         if (do_reap)
986                                 vpage_insert(vacuum_pages, vacpagecopy);
987                         if (do_frag)
988                                 vpage_insert(fraged_pages, vacpagecopy);
989                 }
990
991                 if (notup)
992                         empty_end_pages++;
993                 else
994                         empty_end_pages = 0;
995
996                 if (pgchanged)
997                 {
998                         WriteBuffer(buf);
999                         changed_pages++;
1000                 }
1001                 else
1002                         ReleaseBuffer(buf);
1003         }
1004
1005         pfree(vacpage);
1006
1007         /* save stats in the rel list for use later */
1008         vacrelstats->rel_tuples = num_tuples;
1009         vacrelstats->rel_pages = nblocks;
1010         if (num_tuples == 0)
1011                 min_tlen = max_tlen = 0;
1012         vacrelstats->min_tlen = min_tlen;
1013         vacrelstats->max_tlen = max_tlen;
1014
1015         vacuum_pages->empty_end_pages = empty_end_pages;
1016         fraged_pages->empty_end_pages = empty_end_pages;
1017
1018         /*
1019          * Clear the fraged_pages list if we found we couldn't shrink.
1020          * Else, remove any "empty" end-pages from the list, and compute
1021          * usable free space = free space in remaining pages.
1022          */
1023         if (do_shrinking)
1024         {
1025                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1026                 fraged_pages->num_pages -= empty_end_pages;
1027                 usable_free_size = 0;
1028                 for (i = 0; i < fraged_pages->num_pages; i++)
1029                         usable_free_size += fraged_pages->pagedesc[i]->free;
1030         }
1031         else
1032         {
1033                 fraged_pages->num_pages = 0;
1034                 usable_free_size = 0;
1035         }
1036
1037         if (usable_free_size > 0 && num_vtlinks > 0)
1038         {
1039                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1040                           vac_cmp_vtlinks);
1041                 vacrelstats->vtlinks = vtlinks;
1042                 vacrelstats->num_vtlinks = num_vtlinks;
1043         }
1044         else
1045         {
1046                 vacrelstats->vtlinks = NULL;
1047                 vacrelstats->num_vtlinks = 0;
1048                 pfree(vtlinks);
1049         }
1050
1051         elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
1052 Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
1053 Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u.\n\t%s",
1054                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1055                  new_pages, num_tuples, tups_vacuumed,
1056                  nkeep, vacrelstats->num_vtlinks,
1057                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1058                  free_size, usable_free_size,
1059                  empty_end_pages, fraged_pages->num_pages,
1060                  vac_show_rusage(&ru0));
1061
1062 }
1063
1064
1065 /*
1066  *      repair_frag() -- try to repair relation's fragmentation
1067  *
1068  *              This routine marks dead tuples as unused and tries re-use dead space
1069  *              by moving tuples (and inserting indexes if needed). It constructs
1070  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1071  *              for them after committing (in hack-manner - without losing locks
1072  *              and freeing memory!) current transaction. It truncates relation
1073  *              if some end-blocks are gone away.
1074  */
1075 static void
1076 repair_frag(VRelStats *vacrelstats, Relation onerel,
1077                         VacPageList vacuum_pages, VacPageList fraged_pages,
1078                         int nindexes, Relation *Irel)
1079 {
1080         TransactionId myXID;
1081         CommandId       myCID;
1082         Buffer          buf,
1083                                 cur_buffer;
1084         BlockNumber     nblocks,
1085                                 blkno;
1086         BlockNumber     last_move_dest_block = 0,
1087                                 last_vacuum_block;
1088         Page            page,
1089                                 ToPage = NULL;
1090         OffsetNumber offnum,
1091                                 maxoff,
1092                                 newoff,
1093                                 max_offset;
1094         ItemId          itemid,
1095                                 newitemid;
1096         HeapTupleData tuple,
1097                                 newtup;
1098         TupleDesc       tupdesc;
1099         ResultRelInfo *resultRelInfo;
1100         EState     *estate;
1101         TupleTable      tupleTable;
1102         TupleTableSlot *slot;
1103         VacPageListData Nvacpagelist;
1104         VacPage         cur_page = NULL,
1105                                 last_vacuum_page,
1106                                 vacpage,
1107                            *curpage;
1108         int                     cur_item = 0;
1109         int                     i;
1110         Size            tuple_len;
1111         int                     num_moved,
1112                                 num_fraged_pages,
1113                                 vacuumed_pages;
1114         int                     checked_moved,
1115                                 num_tuples,
1116                                 keep_tuples = 0;
1117         bool            isempty,
1118                                 dowrite,
1119                                 chain_tuple_moved;
1120         VacRUsage       ru0;
1121
1122         vac_init_rusage(&ru0);
1123
1124         myXID = GetCurrentTransactionId();
1125         myCID = GetCurrentCommandId();
1126
1127         tupdesc = RelationGetDescr(onerel);
1128
1129         /*
1130          * We need a ResultRelInfo and an EState so we can use the regular
1131          * executor's index-entry-making machinery.
1132          */
1133         resultRelInfo = makeNode(ResultRelInfo);
1134         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1135         resultRelInfo->ri_RelationDesc = onerel;
1136         resultRelInfo->ri_TrigDesc = NULL;                      /* we don't fire triggers */
1137
1138         ExecOpenIndices(resultRelInfo);
1139
1140         estate = CreateExecutorState();
1141         estate->es_result_relations = resultRelInfo;
1142         estate->es_num_result_relations = 1;
1143         estate->es_result_relation_info = resultRelInfo;
1144
1145         /* Set up a dummy tuple table too */
1146         tupleTable = ExecCreateTupleTable(1);
1147         slot = ExecAllocTableSlot(tupleTable);
1148         ExecSetSlotDescriptor(slot, tupdesc, false);
1149
1150         Nvacpagelist.num_pages = 0;
1151         num_fraged_pages = fraged_pages->num_pages;
1152         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1153         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1154         if (vacuumed_pages > 0)
1155         {
1156                 /* get last reaped page from vacuum_pages */
1157                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1158                 last_vacuum_block = last_vacuum_page->blkno;
1159         }
1160         else
1161         {
1162                 last_vacuum_page = NULL;
1163                 last_vacuum_block = InvalidBlockNumber;
1164         }
1165         cur_buffer = InvalidBuffer;
1166         num_moved = 0;
1167
1168         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1169         vacpage->offsets_used = vacpage->offsets_free = 0;
1170
1171         /*
1172          * Scan pages backwards from the last nonempty page, trying to move
1173          * tuples down to lower pages.  Quit when we reach a page that we have
1174          * moved any tuples onto, or the first page if we haven't moved anything,
1175          * or when we find a page we cannot completely empty (this last condition
1176          * is handled by "break" statements within the loop).
1177          *
1178          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1179          * in order by blkno.
1180          */
1181         nblocks = vacrelstats->rel_pages;
1182         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1183                  blkno > last_move_dest_block;
1184                  blkno--)
1185         {
1186                 /*
1187                  * Forget fraged_pages pages at or after this one; they're no longer
1188                  * useful as move targets, since we only want to move down.  Note
1189                  * that since we stop the outer loop at last_move_dest_block, pages
1190                  * removed here cannot have had anything moved onto them already.
1191                  *
1192                  * Also note that we don't change the stored fraged_pages list,
1193                  * only our local variable num_fraged_pages; so the forgotten pages
1194                  * are still available to be loaded into the free space map later.
1195                  */
1196                 while (num_fraged_pages > 0 &&
1197                            fraged_pages->pagedesc[num_fraged_pages-1]->blkno >= blkno)
1198                 {
1199                         Assert(fraged_pages->pagedesc[num_fraged_pages-1]->offsets_used == 0);
1200                         --num_fraged_pages;
1201                 }
1202
1203                 /*
1204                  * Process this page of relation.
1205                  */
1206                 buf = ReadBuffer(onerel, blkno);
1207                 page = BufferGetPage(buf);
1208
1209                 vacpage->offsets_free = 0;
1210
1211                 isempty = PageIsEmpty(page);
1212
1213                 dowrite = false;
1214
1215                 /* Is the page in the vacuum_pages list? */
1216                 if (blkno == last_vacuum_block)
1217                 {
1218                         if (last_vacuum_page->offsets_free > 0)
1219                         {
1220                                 /* there are dead tuples on this page - clean them */
1221                                 Assert(!isempty);
1222                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1223                                 vacuum_page(onerel, buf, last_vacuum_page);
1224                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1225                                 dowrite = true;
1226                         }
1227                         else
1228                                 Assert(isempty);
1229                         --vacuumed_pages;
1230                         if (vacuumed_pages > 0)
1231                         {
1232                                 /* get prev reaped page from vacuum_pages */
1233                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1234                                 last_vacuum_block = last_vacuum_page->blkno;
1235                         }
1236                         else
1237                         {
1238                                 last_vacuum_page = NULL;
1239                                 last_vacuum_block = InvalidBlockNumber;
1240                         }
1241                         if (isempty)
1242                         {
1243                                 ReleaseBuffer(buf);
1244                                 continue;
1245                         }
1246                 }
1247                 else
1248                         Assert(!isempty);
1249
1250                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1251                                                                                  * off this page, yet */
1252                 vacpage->blkno = blkno;
1253                 maxoff = PageGetMaxOffsetNumber(page);
1254                 for (offnum = FirstOffsetNumber;
1255                          offnum <= maxoff;
1256                          offnum = OffsetNumberNext(offnum))
1257                 {
1258                         itemid = PageGetItemId(page, offnum);
1259
1260                         if (!ItemIdIsUsed(itemid))
1261                                 continue;
1262
1263                         tuple.t_datamcxt = NULL;
1264                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1265                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1266                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1267
1268                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1269                         {
1270                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1271                                         elog(ERROR, "Invalid XID in t_cmin");
1272                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1273                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1274
1275                                 /*
1276                                  * If this (chain) tuple is moved by me already then I
1277                                  * have to check is it in vacpage or not - i.e. is it
1278                                  * moved while cleaning this page or some previous one.
1279                                  */
1280                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1281                                 {
1282                                         if (keep_tuples == 0)
1283                                                 continue;
1284                                         if (chain_tuple_moved)          /* some chains was moved
1285                                                                                                  * while */
1286                                         {                       /* cleaning this page */
1287                                                 Assert(vacpage->offsets_free > 0);
1288                                                 for (i = 0; i < vacpage->offsets_free; i++)
1289                                                 {
1290                                                         if (vacpage->offsets[i] == offnum)
1291                                                                 break;
1292                                                 }
1293                                                 if (i >= vacpage->offsets_free) /* not found */
1294                                                 {
1295                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1296                                                         keep_tuples--;
1297                                                 }
1298                                         }
1299                                         else
1300                                         {
1301                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1302                                                 keep_tuples--;
1303                                         }
1304                                         continue;
1305                                 }
1306                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1307                         }
1308
1309                         /*
1310                          * If this tuple is in the chain of tuples created in updates
1311                          * by "recent" transactions then we have to move all chain of
1312                          * tuples to another places.
1313                          */
1314                         if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
1315                                  !TransactionIdPrecedes(tuple.t_data->t_xmin, XmaxRecent)) ||
1316                                 (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1317                                  !(ItemPointerEquals(&(tuple.t_self),
1318                                                                          &(tuple.t_data->t_ctid)))))
1319                         {
1320                                 Buffer          Cbuf = buf;
1321                                 Page            Cpage;
1322                                 ItemId          Citemid;
1323                                 ItemPointerData Ctid;
1324                                 HeapTupleData tp = tuple;
1325                                 Size            tlen = tuple_len;
1326                                 VTupleMove      vtmove = (VTupleMove)
1327                                 palloc(100 * sizeof(VTupleMoveData));
1328                                 int                     num_vtmove = 0;
1329                                 int                     free_vtmove = 100;
1330                                 VacPage         to_vacpage = NULL;
1331                                 int                     to_item = 0;
1332                                 bool            freeCbuf = false;
1333                                 int                     ti;
1334
1335                                 if (vacrelstats->vtlinks == NULL)
1336                                         elog(ERROR, "No one parent tuple was found");
1337                                 if (cur_buffer != InvalidBuffer)
1338                                 {
1339                                         WriteBuffer(cur_buffer);
1340                                         cur_buffer = InvalidBuffer;
1341                                 }
1342
1343                                 /*
1344                                  * If this tuple is in the begin/middle of the chain then
1345                                  * we have to move to the end of chain.
1346                                  */
1347                                 while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1348                                            !(ItemPointerEquals(&(tp.t_self),
1349                                                                                    &(tp.t_data->t_ctid))))
1350                                 {
1351                                         Ctid = tp.t_data->t_ctid;
1352                                         if (freeCbuf)
1353                                                 ReleaseBuffer(Cbuf);
1354                                         freeCbuf = true;
1355                                         Cbuf = ReadBuffer(onerel,
1356                                                                           ItemPointerGetBlockNumber(&Ctid));
1357                                         Cpage = BufferGetPage(Cbuf);
1358                                         Citemid = PageGetItemId(Cpage,
1359                                                                           ItemPointerGetOffsetNumber(&Ctid));
1360                                         if (!ItemIdIsUsed(Citemid))
1361                                         {
1362
1363                                                 /*
1364                                                  * This means that in the middle of chain there
1365                                                  * was tuple updated by older (than XmaxRecent)
1366                                                  * xaction and this tuple is already deleted by
1367                                                  * me. Actually, upper part of chain should be
1368                                                  * removed and seems that this should be handled
1369                                                  * in scan_heap(), but it's not implemented at the
1370                                                  * moment and so we just stop shrinking here.
1371                                                  */
1372                                                 ReleaseBuffer(Cbuf);
1373                                                 pfree(vtmove);
1374                                                 vtmove = NULL;
1375                                                 elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1376                                                 break;
1377                                         }
1378                                         tp.t_datamcxt = NULL;
1379                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1380                                         tp.t_self = Ctid;
1381                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1382                                 }
1383                                 if (vtmove == NULL)
1384                                         break;
1385                                 /* first, can chain be moved ? */
1386                                 for (;;)
1387                                 {
1388                                         if (to_vacpage == NULL ||
1389                                                 !enough_space(to_vacpage, tlen))
1390                                         {
1391                                                 for (i = 0; i < num_fraged_pages; i++)
1392                                                 {
1393                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1394                                                                 break;
1395                                                 }
1396
1397                                                 if (i == num_fraged_pages)
1398                                                 {
1399                                                         /* can't move item anywhere */
1400                                                         for (i = 0; i < num_vtmove; i++)
1401                                                         {
1402                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1403                                                                 (vtmove[i].vacpage->offsets_used)--;
1404                                                         }
1405                                                         num_vtmove = 0;
1406                                                         break;
1407                                                 }
1408                                                 to_item = i;
1409                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1410                                         }
1411                                         to_vacpage->free -= MAXALIGN(tlen);
1412                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1413                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1414                                         (to_vacpage->offsets_used)++;
1415                                         if (free_vtmove == 0)
1416                                         {
1417                                                 free_vtmove = 1000;
1418                                                 vtmove = (VTupleMove) repalloc(vtmove,
1419                                                                                          (free_vtmove + num_vtmove) *
1420                                                                                                  sizeof(VTupleMoveData));
1421                                         }
1422                                         vtmove[num_vtmove].tid = tp.t_self;
1423                                         vtmove[num_vtmove].vacpage = to_vacpage;
1424                                         if (to_vacpage->offsets_used == 1)
1425                                                 vtmove[num_vtmove].cleanVpd = true;
1426                                         else
1427                                                 vtmove[num_vtmove].cleanVpd = false;
1428                                         free_vtmove--;
1429                                         num_vtmove++;
1430
1431                                         /* All done ? */
1432                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1433                                                 TransactionIdPrecedes(tp.t_data->t_xmin, XmaxRecent))
1434                                                 break;
1435
1436                                         /* Well, try to find tuple with old row version */
1437                                         for (;;)
1438                                         {
1439                                                 Buffer          Pbuf;
1440                                                 Page            Ppage;
1441                                                 ItemId          Pitemid;
1442                                                 HeapTupleData Ptp;
1443                                                 VTupleLinkData vtld,
1444                                                                    *vtlp;
1445
1446                                                 vtld.new_tid = tp.t_self;
1447                                                 vtlp = (VTupleLink)
1448                                                         vac_bsearch((void *) &vtld,
1449                                                                                 (void *) (vacrelstats->vtlinks),
1450                                                                                 vacrelstats->num_vtlinks,
1451                                                                                 sizeof(VTupleLinkData),
1452                                                                                 vac_cmp_vtlinks);
1453                                                 if (vtlp == NULL)
1454                                                         elog(ERROR, "Parent tuple was not found");
1455                                                 tp.t_self = vtlp->this_tid;
1456                                                 Pbuf = ReadBuffer(onerel,
1457                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1458                                                 Ppage = BufferGetPage(Pbuf);
1459                                                 Pitemid = PageGetItemId(Ppage,
1460                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1461                                                 if (!ItemIdIsUsed(Pitemid))
1462                                                         elog(ERROR, "Parent itemid marked as unused");
1463                                                 Ptp.t_datamcxt = NULL;
1464                                                 Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1465                                                 Assert(ItemPointerEquals(&(vtld.new_tid),
1466                                                                                                  &(Ptp.t_data->t_ctid)));
1467
1468                                                 /*
1469                                                  * Read above about cases when
1470                                                  * !ItemIdIsUsed(Citemid) (child item is
1471                                                  * removed)... Due to the fact that at the moment
1472                                                  * we don't remove unuseful part of update-chain,
1473                                                  * it's possible to get too old parent row here.
1474                                                  * Like as in the case which caused this problem,
1475                                                  * we stop shrinking here. I could try to find
1476                                                  * real parent row but want not to do it because
1477                                                  * of real solution will be implemented anyway,
1478                                                  * latter, and we are too close to 6.5 release. -
1479                                                  * vadim 06/11/99
1480                                                  */
1481                                                 if (!(TransactionIdEquals(Ptp.t_data->t_xmax,
1482                                                                                                   tp.t_data->t_xmin)))
1483                                                 {
1484                                                         if (freeCbuf)
1485                                                                 ReleaseBuffer(Cbuf);
1486                                                         freeCbuf = false;
1487                                                         ReleaseBuffer(Pbuf);
1488                                                         for (i = 0; i < num_vtmove; i++)
1489                                                         {
1490                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1491                                                                 (vtmove[i].vacpage->offsets_used)--;
1492                                                         }
1493                                                         num_vtmove = 0;
1494                                                         elog(NOTICE, "Too old parent tuple found - can't continue repair_frag");
1495                                                         break;
1496                                                 }
1497 #ifdef NOT_USED                                 /* I'm not sure that this will wotk
1498                                                                  * properly... */
1499
1500                                                 /*
1501                                                  * If this tuple is updated version of row and it
1502                                                  * was created by the same transaction then no one
1503                                                  * is interested in this tuple - mark it as
1504                                                  * removed.
1505                                                  */
1506                                                 if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
1507                                                         TransactionIdEquals(Ptp.t_data->t_xmin,
1508                                                                                                 Ptp.t_data->t_xmax))
1509                                                 {
1510                                                         TransactionIdStore(myXID,
1511                                                                 (TransactionId *) &(Ptp.t_data->t_cmin));
1512                                                         Ptp.t_data->t_infomask &=
1513                                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1514                                                         Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
1515                                                         WriteBuffer(Pbuf);
1516                                                         continue;
1517                                                 }
1518 #endif
1519                                                 tp.t_datamcxt = Ptp.t_datamcxt;
1520                                                 tp.t_data = Ptp.t_data;
1521                                                 tlen = tp.t_len = ItemIdGetLength(Pitemid);
1522                                                 if (freeCbuf)
1523                                                         ReleaseBuffer(Cbuf);
1524                                                 Cbuf = Pbuf;
1525                                                 freeCbuf = true;
1526                                                 break;
1527                                         }
1528                                         if (num_vtmove == 0)
1529                                                 break;
1530                                 }
1531                                 if (freeCbuf)
1532                                         ReleaseBuffer(Cbuf);
1533                                 if (num_vtmove == 0)    /* chain can't be moved */
1534                                 {
1535                                         pfree(vtmove);
1536                                         break;
1537                                 }
1538                                 ItemPointerSetInvalid(&Ctid);
1539                                 for (ti = 0; ti < num_vtmove; ti++)
1540                                 {
1541                                         VacPage         destvacpage = vtmove[ti].vacpage;
1542
1543                                         /* Get page to move from */
1544                                         tuple.t_self = vtmove[ti].tid;
1545                                         Cbuf = ReadBuffer(onerel,
1546                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1547
1548                                         /* Get page to move to */
1549                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1550
1551                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1552                                         if (cur_buffer != Cbuf)
1553                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1554
1555                                         ToPage = BufferGetPage(cur_buffer);
1556                                         Cpage = BufferGetPage(Cbuf);
1557
1558                                         Citemid = PageGetItemId(Cpage,
1559                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1560                                         tuple.t_datamcxt = NULL;
1561                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1562                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1563
1564                                         /*
1565                                          * make a copy of the source tuple, and then mark the
1566                                          * source tuple MOVED_OFF.
1567                                          */
1568                                         heap_copytuple_with_tuple(&tuple, &newtup);
1569
1570                                         RelationInvalidateHeapTuple(onerel, &tuple);
1571
1572                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1573                                         START_CRIT_SECTION();
1574
1575                                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1576                                         tuple.t_data->t_infomask &=
1577                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1578                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1579
1580                                         /*
1581                                          * If this page was not used before - clean it.
1582                                          *
1583                                          * NOTE: a nasty bug used to lurk here.  It is possible
1584                                          * for the source and destination pages to be the same
1585                                          * (since this tuple-chain member can be on a page
1586                                          * lower than the one we're currently processing in
1587                                          * the outer loop).  If that's true, then after
1588                                          * vacuum_page() the source tuple will have been
1589                                          * moved, and tuple.t_data will be pointing at
1590                                          * garbage.  Therefore we must do everything that uses
1591                                          * tuple.t_data BEFORE this step!!
1592                                          *
1593                                          * This path is different from the other callers of
1594                                          * vacuum_page, because we have already incremented
1595                                          * the vacpage's offsets_used field to account for the
1596                                          * tuple(s) we expect to move onto the page. Therefore
1597                                          * vacuum_page's check for offsets_used == 0 is wrong.
1598                                          * But since that's a good debugging check for all
1599                                          * other callers, we work around it here rather than
1600                                          * remove it.
1601                                          */
1602                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1603                                         {
1604                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1605
1606                                                 destvacpage->offsets_used = 0;
1607                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1608                                                 destvacpage->offsets_used = sv_offsets_used;
1609                                         }
1610
1611                                         /*
1612                                          * Update the state of the copied tuple, and store it
1613                                          * on the destination page.
1614                                          */
1615                                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1616                                         newtup.t_data->t_infomask &=
1617                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1618                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1619                                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1620                                                                                  InvalidOffsetNumber, LP_USED);
1621                                         if (newoff == InvalidOffsetNumber)
1622                                         {
1623                                                 elog(STOP, "moving chain: failed to add item with len = %lu to page %u",
1624                                                   (unsigned long) tuple_len, destvacpage->blkno);
1625                                         }
1626                                         newitemid = PageGetItemId(ToPage, newoff);
1627                                         pfree(newtup.t_data);
1628                                         newtup.t_datamcxt = NULL;
1629                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1630                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1631
1632                                         {
1633                                                 XLogRecPtr      recptr =
1634                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1635                                                                           cur_buffer, &newtup);
1636
1637                                                 if (Cbuf != cur_buffer)
1638                                                 {
1639                                                         PageSetLSN(Cpage, recptr);
1640                                                         PageSetSUI(Cpage, ThisStartUpID);
1641                                                 }
1642                                                 PageSetLSN(ToPage, recptr);
1643                                                 PageSetSUI(ToPage, ThisStartUpID);
1644                                         }
1645                                         END_CRIT_SECTION();
1646
1647                                         if (destvacpage->blkno > last_move_dest_block)
1648                                                 last_move_dest_block = destvacpage->blkno;
1649
1650                                         /*
1651                                          * Set new tuple's t_ctid pointing to itself for last
1652                                          * tuple in chain, and to next tuple in chain
1653                                          * otherwise.
1654                                          */
1655                                         if (!ItemPointerIsValid(&Ctid))
1656                                                 newtup.t_data->t_ctid = newtup.t_self;
1657                                         else
1658                                                 newtup.t_data->t_ctid = Ctid;
1659                                         Ctid = newtup.t_self;
1660
1661                                         num_moved++;
1662
1663                                         /*
1664                                          * Remember that we moved tuple from the current page
1665                                          * (corresponding index tuple will be cleaned).
1666                                          */
1667                                         if (Cbuf == buf)
1668                                                 vacpage->offsets[vacpage->offsets_free++] =
1669                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1670                                         else
1671                                                 keep_tuples++;
1672
1673                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1674                                         if (cur_buffer != Cbuf)
1675                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
1676
1677                                         /* Create index entries for the moved tuple */
1678                                         if (resultRelInfo->ri_NumIndices > 0)
1679                                         {
1680                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
1681                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
1682                                                                                           estate, true);
1683                                         }
1684
1685                                         WriteBuffer(cur_buffer);
1686                                         WriteBuffer(Cbuf);
1687                                 }
1688                                 cur_buffer = InvalidBuffer;
1689                                 pfree(vtmove);
1690                                 chain_tuple_moved = true;
1691                                 continue;
1692                         }
1693
1694                         /* try to find new page for this tuple */
1695                         if (cur_buffer == InvalidBuffer ||
1696                                 !enough_space(cur_page, tuple_len))
1697                         {
1698                                 if (cur_buffer != InvalidBuffer)
1699                                 {
1700                                         WriteBuffer(cur_buffer);
1701                                         cur_buffer = InvalidBuffer;
1702                                 }
1703                                 for (i = 0; i < num_fraged_pages; i++)
1704                                 {
1705                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
1706                                                 break;
1707                                 }
1708                                 if (i == num_fraged_pages)
1709                                         break;          /* can't move item anywhere */
1710                                 cur_item = i;
1711                                 cur_page = fraged_pages->pagedesc[cur_item];
1712                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
1713                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1714                                 ToPage = BufferGetPage(cur_buffer);
1715                                 /* if this page was not used before - clean it */
1716                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
1717                                         vacuum_page(onerel, cur_buffer, cur_page);
1718                         }
1719                         else
1720                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1721
1722                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1723
1724                         /* copy tuple */
1725                         heap_copytuple_with_tuple(&tuple, &newtup);
1726
1727                         RelationInvalidateHeapTuple(onerel, &tuple);
1728
1729                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1730                         START_CRIT_SECTION();
1731
1732                         /*
1733                          * Mark new tuple as moved_in by vacuum and store vacuum XID
1734                          * in t_cmin !!!
1735                          */
1736                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1737                         newtup.t_data->t_infomask &=
1738                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1739                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1740
1741                         /* add tuple to the page */
1742                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1743                                                                  InvalidOffsetNumber, LP_USED);
1744                         if (newoff == InvalidOffsetNumber)
1745                         {
1746                                 elog(STOP, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
1747                                          (unsigned long) tuple_len,
1748                                          cur_page->blkno, (unsigned long) cur_page->free,
1749                                          cur_page->offsets_used, cur_page->offsets_free);
1750                         }
1751                         newitemid = PageGetItemId(ToPage, newoff);
1752                         pfree(newtup.t_data);
1753                         newtup.t_datamcxt = NULL;
1754                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1755                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
1756                         newtup.t_self = newtup.t_data->t_ctid;
1757
1758                         /*
1759                          * Mark old tuple as moved_off by vacuum and store vacuum XID
1760                          * in t_cmin !!!
1761                          */
1762                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1763                         tuple.t_data->t_infomask &=
1764                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1765                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1766
1767                         {
1768                                 XLogRecPtr      recptr =
1769                                 log_heap_move(onerel, buf, tuple.t_self,
1770                                                           cur_buffer, &newtup);
1771
1772                                 PageSetLSN(page, recptr);
1773                                 PageSetSUI(page, ThisStartUpID);
1774                                 PageSetLSN(ToPage, recptr);
1775                                 PageSetSUI(ToPage, ThisStartUpID);
1776                         }
1777                         END_CRIT_SECTION();
1778
1779                         cur_page->offsets_used++;
1780                         num_moved++;
1781                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1782                         if (cur_page->blkno > last_move_dest_block)
1783                                 last_move_dest_block = cur_page->blkno;
1784
1785                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1786
1787                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1788                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1789
1790                         /* insert index' tuples if needed */
1791                         if (resultRelInfo->ri_NumIndices > 0)
1792                         {
1793                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
1794                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
1795                         }
1796                 }                                               /* walk along page */
1797
1798                 if (offnum < maxoff && keep_tuples > 0)
1799                 {
1800                         OffsetNumber off;
1801
1802                         for (off = OffsetNumberNext(offnum);
1803                                  off <= maxoff;
1804                                  off = OffsetNumberNext(off))
1805                         {
1806                                 itemid = PageGetItemId(page, off);
1807                                 if (!ItemIdIsUsed(itemid))
1808                                         continue;
1809                                 tuple.t_datamcxt = NULL;
1810                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1811                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
1812                                         continue;
1813                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1814                                         elog(ERROR, "Invalid XID in t_cmin (4)");
1815                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1816                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
1817                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1818                                 {
1819                                         /* some chains was moved while */
1820                                         if (chain_tuple_moved)
1821                                         {                       /* cleaning this page */
1822                                                 Assert(vacpage->offsets_free > 0);
1823                                                 for (i = 0; i < vacpage->offsets_free; i++)
1824                                                 {
1825                                                         if (vacpage->offsets[i] == off)
1826                                                                 break;
1827                                                 }
1828                                                 if (i >= vacpage->offsets_free) /* not found */
1829                                                 {
1830                                                         vacpage->offsets[vacpage->offsets_free++] = off;
1831                                                         Assert(keep_tuples > 0);
1832                                                         keep_tuples--;
1833                                                 }
1834                                         }
1835                                         else
1836                                         {
1837                                                 vacpage->offsets[vacpage->offsets_free++] = off;
1838                                                 Assert(keep_tuples > 0);
1839                                                 keep_tuples--;
1840                                         }
1841                                 }
1842                         }
1843                 }
1844
1845                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
1846                 {
1847                         if (chain_tuple_moved)          /* else - they are ordered */
1848                         {
1849                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
1850                                           sizeof(OffsetNumber), vac_cmp_offno);
1851                         }
1852                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
1853                         WriteBuffer(buf);
1854                 }
1855                 else if (dowrite)
1856                         WriteBuffer(buf);
1857                 else
1858                         ReleaseBuffer(buf);
1859
1860                 if (offnum <= maxoff)
1861                         break;                          /* some item(s) left */
1862
1863         }                                                       /* walk along relation */
1864
1865         blkno++;                                        /* new number of blocks */
1866
1867         if (cur_buffer != InvalidBuffer)
1868         {
1869                 Assert(num_moved > 0);
1870                 WriteBuffer(cur_buffer);
1871         }
1872
1873         if (num_moved > 0)
1874         {
1875                 /*
1876                  * We have to commit our tuple movings before we truncate the
1877                  * relation.  Ideally we should do Commit/StartTransactionCommand
1878                  * here, relying on the session-level table lock to protect our
1879                  * exclusive access to the relation.  However, that would require
1880                  * a lot of extra code to close and re-open the relation, indexes,
1881                  * etc.  For now, a quick hack: record status of current
1882                  * transaction as committed, and continue.
1883                  */
1884                 RecordTransactionCommit();
1885         }
1886
1887         /*
1888          * We are not going to move any more tuples across pages, but we still
1889          * need to apply vacuum_page to compact free space in the remaining
1890          * pages in vacuum_pages list.  Note that some of these pages may also
1891          * be in the fraged_pages list, and may have had tuples moved onto them;
1892          * if so, we already did vacuum_page and needn't do it again.
1893          */
1894         for (i = 0, curpage = vacuum_pages->pagedesc;
1895                  i < vacuumed_pages;
1896                  i++, curpage++)
1897         {
1898                 Assert((*curpage)->blkno < blkno);
1899                 if ((*curpage)->offsets_used == 0)
1900                 {
1901                         /* this page was not used as a move target, so must clean it */
1902                         buf = ReadBuffer(onerel, (*curpage)->blkno);
1903                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1904                         page = BufferGetPage(buf);
1905                         if (!PageIsEmpty(page))
1906                                 vacuum_page(onerel, buf, *curpage);
1907                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1908                         WriteBuffer(buf);
1909                 }
1910         }
1911
1912         /*
1913          * Now scan all the pages that we moved tuples onto and update
1914          * tuple status bits.  This is not really necessary, but will save time
1915          * for future transactions examining these tuples.
1916          *
1917          * XXX Notice that this code fails to clear HEAP_MOVED_OFF tuples from
1918          * pages that were move source pages but not move dest pages.  One also
1919          * wonders whether it wouldn't be better to skip this step and let the
1920          * tuple status updates happen someplace that's not holding an exclusive
1921          * lock on the relation.
1922          */
1923         checked_moved = 0;
1924         for (i = 0, curpage = fraged_pages->pagedesc;
1925                  i < num_fraged_pages;
1926                  i++, curpage++)
1927         {
1928                 Assert((*curpage)->blkno < blkno);
1929                 if ((*curpage)->blkno > last_move_dest_block)
1930                         break;                          /* no need to scan any further */
1931                 if ((*curpage)->offsets_used == 0)
1932                         continue;                       /* this page was never used as a move dest */
1933                 buf = ReadBuffer(onerel, (*curpage)->blkno);
1934                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1935                 page = BufferGetPage(buf);
1936                 num_tuples = 0;
1937                 max_offset = PageGetMaxOffsetNumber(page);
1938                 for (newoff = FirstOffsetNumber;
1939                          newoff <= max_offset;
1940                          newoff = OffsetNumberNext(newoff))
1941                 {
1942                         itemid = PageGetItemId(page, newoff);
1943                         if (!ItemIdIsUsed(itemid))
1944                                 continue;
1945                         tuple.t_datamcxt = NULL;
1946                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1947                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1948                         {
1949                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1950                                         elog(ERROR, "Invalid XID in t_cmin (2)");
1951                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1952                                 {
1953                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
1954                                         num_tuples++;
1955                                 }
1956                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1957                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
1958                                 else
1959                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
1960                         }
1961                 }
1962                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1963                 WriteBuffer(buf);
1964                 Assert((*curpage)->offsets_used == num_tuples);
1965                 checked_moved += num_tuples;
1966         }
1967         Assert(num_moved == checked_moved);
1968
1969         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
1970                  RelationGetRelationName(onerel),
1971                  nblocks, blkno, num_moved,
1972                  vac_show_rusage(&ru0));
1973
1974         /*
1975          * Reflect the motion of system tuples to catalog cache here.
1976          */
1977         CommandCounterIncrement();
1978
1979         if (Nvacpagelist.num_pages > 0)
1980         {
1981                 /* vacuum indexes again if needed */
1982                 if (Irel != (Relation *) NULL)
1983                 {
1984                         VacPage    *vpleft,
1985                                            *vpright,
1986                                                 vpsave;
1987
1988                         /* re-sort Nvacpagelist.pagedesc */
1989                         for (vpleft = Nvacpagelist.pagedesc,
1990                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
1991                                  vpleft < vpright; vpleft++, vpright--)
1992                         {
1993                                 vpsave = *vpleft;
1994                                 *vpleft = *vpright;
1995                                 *vpright = vpsave;
1996                         }
1997                         Assert(keep_tuples >= 0);
1998                         for (i = 0; i < nindexes; i++)
1999                                 vacuum_index(&Nvacpagelist, Irel[i],
2000                                                          vacrelstats->rel_tuples, keep_tuples);
2001                 }
2002
2003                 /* clean moved tuples from last page in Nvacpagelist list */
2004                 if (vacpage->blkno == (blkno - 1) &&
2005                         vacpage->offsets_free > 0)
2006                 {
2007                         OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
2008                         OffsetNumber *unused = unbuf;
2009                         int                     uncnt;
2010
2011                         buf = ReadBuffer(onerel, vacpage->blkno);
2012                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2013                         page = BufferGetPage(buf);
2014                         num_tuples = 0;
2015                         maxoff = PageGetMaxOffsetNumber(page);
2016                         for (offnum = FirstOffsetNumber;
2017                                  offnum <= maxoff;
2018                                  offnum = OffsetNumberNext(offnum))
2019                         {
2020                                 itemid = PageGetItemId(page, offnum);
2021                                 if (!ItemIdIsUsed(itemid))
2022                                         continue;
2023                                 tuple.t_datamcxt = NULL;
2024                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2025
2026                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2027                                 {
2028                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
2029                                                 elog(ERROR, "Invalid XID in t_cmin (3)");
2030                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2031                                         {
2032                                                 itemid->lp_flags &= ~LP_USED;
2033                                                 num_tuples++;
2034                                         }
2035                                         else
2036                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2037                                 }
2038
2039                         }
2040                         Assert(vacpage->offsets_free == num_tuples);
2041                         START_CRIT_SECTION();
2042                         uncnt = PageRepairFragmentation(page, unused);
2043                         {
2044                                 XLogRecPtr      recptr;
2045
2046                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
2047                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2048                                 PageSetLSN(page, recptr);
2049                                 PageSetSUI(page, ThisStartUpID);
2050                         }
2051                         END_CRIT_SECTION();
2052                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2053                         WriteBuffer(buf);
2054                 }
2055
2056                 /* now - free new list of reaped pages */
2057                 curpage = Nvacpagelist.pagedesc;
2058                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2059                         pfree(*curpage);
2060                 pfree(Nvacpagelist.pagedesc);
2061         }
2062
2063         /*
2064          * Flush dirty pages out to disk.  We do this unconditionally, even if
2065          * we don't need to truncate, because we want to ensure that all
2066          * tuples have correct on-row commit status on disk (see bufmgr.c's
2067          * comments for FlushRelationBuffers()).
2068          */
2069         i = FlushRelationBuffers(onerel, blkno);
2070         if (i < 0)
2071                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2072                          i);
2073
2074         /* truncate relation, if needed */
2075         if (blkno < nblocks)
2076         {
2077                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2078                 onerel->rd_nblocks = blkno;     /* update relcache immediately */
2079                 onerel->rd_targblock = InvalidBlockNumber;
2080                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2081         }
2082
2083         /* clean up */
2084         pfree(vacpage);
2085         if (vacrelstats->vtlinks != NULL)
2086                 pfree(vacrelstats->vtlinks);
2087
2088         ExecDropTupleTable(tupleTable, true);
2089
2090         ExecCloseIndices(resultRelInfo);
2091 }
2092
2093 /*
2094  *      vacuum_heap() -- free dead tuples
2095  *
2096  *              This routine marks dead tuples as unused and truncates relation
2097  *              if there are "empty" end-blocks.
2098  */
2099 static void
2100 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2101 {
2102         Buffer          buf;
2103         VacPage    *vacpage;
2104         BlockNumber     relblocks;
2105         int                     nblocks;
2106         int                     i;
2107
2108         nblocks = vacuum_pages->num_pages;
2109         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2110
2111         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2112         {
2113                 if ((*vacpage)->offsets_free > 0)
2114                 {
2115                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2116                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2117                         vacuum_page(onerel, buf, *vacpage);
2118                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2119                         WriteBuffer(buf);
2120                 }
2121         }
2122
2123         /*
2124          * Flush dirty pages out to disk.  We do this unconditionally, even if
2125          * we don't need to truncate, because we want to ensure that all
2126          * tuples have correct on-row commit status on disk (see bufmgr.c's
2127          * comments for FlushRelationBuffers()).
2128          */
2129         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2130         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2131
2132         i = FlushRelationBuffers(onerel, relblocks);
2133         if (i < 0)
2134                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2135                          i);
2136
2137         /* truncate relation if there are some empty end-pages */
2138         if (vacuum_pages->empty_end_pages > 0)
2139         {
2140                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
2141                          RelationGetRelationName(onerel),
2142                          vacrelstats->rel_pages, relblocks);
2143                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2144                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2145                 onerel->rd_targblock = InvalidBlockNumber;
2146                 vacrelstats->rel_pages = relblocks;             /* set new number of
2147                                                                                                  * blocks */
2148         }
2149 }
2150
2151 /*
2152  *      vacuum_page() -- free dead tuples on a page
2153  *                                       and repair its fragmentation.
2154  */
2155 static void
2156 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2157 {
2158         OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
2159         OffsetNumber *unused = unbuf;
2160         int                     uncnt;
2161         Page            page = BufferGetPage(buffer);
2162         ItemId          itemid;
2163         int                     i;
2164
2165         /* There shouldn't be any tuples moved onto the page yet! */
2166         Assert(vacpage->offsets_used == 0);
2167
2168         START_CRIT_SECTION();
2169         for (i = 0; i < vacpage->offsets_free; i++)
2170         {
2171                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2172                 itemid->lp_flags &= ~LP_USED;
2173         }
2174         uncnt = PageRepairFragmentation(page, unused);
2175         {
2176                 XLogRecPtr      recptr;
2177
2178                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2179                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2180                 PageSetLSN(page, recptr);
2181                 PageSetSUI(page, ThisStartUpID);
2182         }
2183         END_CRIT_SECTION();
2184 }
2185
2186 /*
2187  *      scan_index() -- scan one index relation to update statistic.
2188  *
2189  * We use this when we have no deletions to do.
2190  */
2191 static void
2192 scan_index(Relation indrel, double num_tuples)
2193 {
2194         IndexBulkDeleteResult *stats;
2195         VacRUsage       ru0;
2196
2197         vac_init_rusage(&ru0);
2198
2199         /*
2200          * Even though we're not planning to delete anything, use the
2201          * ambulkdelete call, so that the scan happens within the index AM
2202          * for more speed.
2203          */
2204         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2205
2206         if (!stats)
2207                 return;
2208
2209         /* now update statistics in pg_class */
2210         vac_update_relstats(RelationGetRelid(indrel),
2211                                                 stats->num_pages, stats->num_index_tuples,
2212                                                 false);
2213
2214         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
2215                  RelationGetRelationName(indrel),
2216                  stats->num_pages, stats->num_index_tuples,
2217                  vac_show_rusage(&ru0));
2218
2219         /*
2220          * Check for tuple count mismatch.  If the index is partial, then
2221          * it's OK for it to have fewer tuples than the heap; else we got trouble.
2222          */
2223         if (stats->num_index_tuples != num_tuples)
2224         {
2225                 if (stats->num_index_tuples > num_tuples ||
2226                         ! vac_is_partial_index(indrel))
2227                         elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2228 \n\tRecreate the index.",
2229                                  RelationGetRelationName(indrel),
2230                                  stats->num_index_tuples, num_tuples);
2231         }
2232
2233         pfree(stats);
2234 }
2235
2236 /*
2237  *      vacuum_index() -- vacuum one index relation.
2238  *
2239  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2240  *              It's locked. Indrel is an index relation on the vacuumed heap.
2241  *
2242  *              We don't bother to set locks on the index relation here, since
2243  *              the parent table is exclusive-locked already.
2244  *
2245  *              Finally, we arrange to update the index relation's statistics in
2246  *              pg_class.
2247  */
2248 static void
2249 vacuum_index(VacPageList vacpagelist, Relation indrel,
2250                          double num_tuples, int keep_tuples)
2251 {
2252         IndexBulkDeleteResult *stats;
2253         VacRUsage       ru0;
2254
2255         vac_init_rusage(&ru0);
2256
2257         /* Do bulk deletion */
2258         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2259
2260         if (!stats)
2261                 return;
2262
2263         /* now update statistics in pg_class */
2264         vac_update_relstats(RelationGetRelid(indrel),
2265                                                 stats->num_pages, stats->num_index_tuples,
2266                                                 false);
2267
2268         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
2269                  RelationGetRelationName(indrel), stats->num_pages,
2270                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2271                  vac_show_rusage(&ru0));
2272
2273         /*
2274          * Check for tuple count mismatch.  If the index is partial, then
2275          * it's OK for it to have fewer tuples than the heap; else we got trouble.
2276          */
2277         if (stats->num_index_tuples != num_tuples + keep_tuples)
2278         {
2279                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2280                         ! vac_is_partial_index(indrel))
2281                         elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2282 \n\tRecreate the index.",
2283                                  RelationGetRelationName(indrel),
2284                                  stats->num_index_tuples, num_tuples);
2285         }
2286
2287         pfree(stats);
2288 }
2289
2290 /*
2291  *      tid_reaped() -- is a particular tid reaped?
2292  *
2293  *              This has the right signature to be an IndexBulkDeleteCallback.
2294  *
2295  *              vacpagelist->VacPage_array is sorted in right order.
2296  */
2297 static bool
2298 tid_reaped(ItemPointer itemptr, void *state)
2299 {
2300         VacPageList     vacpagelist = (VacPageList) state;
2301         OffsetNumber ioffno;
2302         OffsetNumber *voff;
2303         VacPage         vp,
2304                            *vpp;
2305         VacPageData vacpage;
2306
2307         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2308         ioffno = ItemPointerGetOffsetNumber(itemptr);
2309
2310         vp = &vacpage;
2311         vpp = (VacPage *) vac_bsearch((void *) &vp,
2312                                                                   (void *) (vacpagelist->pagedesc),
2313                                                                   vacpagelist->num_pages,
2314                                                                   sizeof(VacPage),
2315                                                                   vac_cmp_blk);
2316
2317         if (vpp == NULL)
2318                 return false;
2319
2320         /* ok - we are on a partially or fully reaped page */
2321         vp = *vpp;
2322
2323         if (vp->offsets_free == 0)
2324         {
2325                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2326                 return true;
2327         }
2328
2329         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2330                                                                                 (void *) (vp->offsets),
2331                                                                                 vp->offsets_free,
2332                                                                                 sizeof(OffsetNumber),
2333                                                                                 vac_cmp_offno);
2334
2335         if (voff == NULL)
2336                 return false;
2337
2338         /* tid is reaped */
2339         return true;
2340 }
2341
2342 /*
2343  * Dummy version for scan_index.
2344  */
2345 static bool
2346 dummy_tid_reaped(ItemPointer itemptr, void *state)
2347 {
2348         return false;
2349 }
2350
2351 /*
2352  * Update the shared Free Space Map with the info we now have about
2353  * free space in the relation, discarding any old info the map may have.
2354  */
2355 static void
2356 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2357                            BlockNumber rel_pages)
2358 {
2359         int                     nPages = fraged_pages->num_pages;
2360         int                     i;
2361         BlockNumber *pages;
2362         Size       *spaceAvail;
2363
2364         /* +1 to avoid palloc(0) */
2365         pages = (BlockNumber *) palloc((nPages + 1) * sizeof(BlockNumber));
2366         spaceAvail = (Size *) palloc((nPages + 1) * sizeof(Size));
2367
2368         for (i = 0; i < nPages; i++)
2369         {
2370                 pages[i] = fraged_pages->pagedesc[i]->blkno;
2371                 spaceAvail[i] = fraged_pages->pagedesc[i]->free;
2372                 /*
2373                  * fraged_pages may contain entries for pages that we later decided
2374                  * to truncate from the relation; don't enter them into the map!
2375                  */
2376                 if (pages[i] >= rel_pages)
2377                 {
2378                         nPages = i;
2379                         break;
2380                 }
2381         }
2382
2383         MultiRecordFreeSpace(&onerel->rd_node,
2384                                                  0, MaxBlockNumber,
2385                                                  nPages, pages, spaceAvail);
2386         pfree(pages);
2387         pfree(spaceAvail);
2388 }
2389
2390 /* Copy a VacPage structure */
2391 static VacPage
2392 copy_vac_page(VacPage vacpage)
2393 {
2394         VacPage         newvacpage;
2395
2396         /* allocate a VacPageData entry */
2397         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2398                                                                   vacpage->offsets_free * sizeof(OffsetNumber));
2399
2400         /* fill it in */
2401         if (vacpage->offsets_free > 0)
2402                 memcpy(newvacpage->offsets, vacpage->offsets,
2403                            vacpage->offsets_free * sizeof(OffsetNumber));
2404         newvacpage->blkno = vacpage->blkno;
2405         newvacpage->free = vacpage->free;
2406         newvacpage->offsets_used = vacpage->offsets_used;
2407         newvacpage->offsets_free = vacpage->offsets_free;
2408
2409         return newvacpage;
2410 }
2411
2412 /*
2413  * Add a VacPage pointer to a VacPageList.
2414  *
2415  *              As a side effect of the way that scan_heap works,
2416  *              higher pages come after lower pages in the array
2417  *              (and highest tid on a page is last).
2418  */
2419 static void
2420 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2421 {
2422 #define PG_NPAGEDESC 1024
2423
2424         /* allocate a VacPage entry if needed */
2425         if (vacpagelist->num_pages == 0)
2426         {
2427                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2428                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2429         }
2430         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2431         {
2432                 vacpagelist->num_allocated_pages *= 2;
2433                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2434         }
2435         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2436         (vacpagelist->num_pages)++;
2437 }
2438
2439 /*
2440  * vac_bsearch: just like standard C library routine bsearch(),
2441  * except that we first test to see whether the target key is outside
2442  * the range of the table entries.  This case is handled relatively slowly
2443  * by the normal binary search algorithm (ie, no faster than any other key)
2444  * but it occurs often enough in VACUUM to be worth optimizing.
2445  */
2446 static void *
2447 vac_bsearch(const void *key, const void *base,
2448                         size_t nelem, size_t size,
2449                         int (*compar) (const void *, const void *))
2450 {
2451         int                     res;
2452         const void *last;
2453
2454         if (nelem == 0)
2455                 return NULL;
2456         res = compar(key, base);
2457         if (res < 0)
2458                 return NULL;
2459         if (res == 0)
2460                 return (void *) base;
2461         if (nelem > 1)
2462         {
2463                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2464                 res = compar(key, last);
2465                 if (res > 0)
2466                         return NULL;
2467                 if (res == 0)
2468                         return (void *) last;
2469         }
2470         if (nelem <= 2)
2471                 return NULL;                    /* already checked 'em all */
2472         return bsearch(key, base, nelem, size, compar);
2473 }
2474
2475 /*
2476  * Comparator routines for use with qsort() and bsearch().
2477  */
2478 static int
2479 vac_cmp_blk(const void *left, const void *right)
2480 {
2481         BlockNumber lblk,
2482                                 rblk;
2483
2484         lblk = (*((VacPage *) left))->blkno;
2485         rblk = (*((VacPage *) right))->blkno;
2486
2487         if (lblk < rblk)
2488                 return -1;
2489         if (lblk == rblk)
2490                 return 0;
2491         return 1;
2492 }
2493
2494 static int
2495 vac_cmp_offno(const void *left, const void *right)
2496 {
2497         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2498                 return -1;
2499         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2500                 return 0;
2501         return 1;
2502 }
2503
2504 static int
2505 vac_cmp_vtlinks(const void *left, const void *right)
2506 {
2507         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2508                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2509                 return -1;
2510         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2511                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2512                 return 1;
2513         /* bi_hi-es are equal */
2514         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2515                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2516                 return -1;
2517         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2518                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2519                 return 1;
2520         /* bi_lo-es are equal */
2521         if (((VTupleLink) left)->new_tid.ip_posid <
2522                 ((VTupleLink) right)->new_tid.ip_posid)
2523                 return -1;
2524         if (((VTupleLink) left)->new_tid.ip_posid >
2525                 ((VTupleLink) right)->new_tid.ip_posid)
2526                 return 1;
2527         return 0;
2528 }
2529
2530
2531 void
2532 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2533 {
2534         List       *indexoidlist,
2535                            *indexoidscan;
2536         int                     i;
2537
2538         indexoidlist = RelationGetIndexList(relation);
2539
2540         *nindexes = length(indexoidlist);
2541
2542         if (*nindexes > 0)
2543                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2544         else
2545                 *Irel = NULL;
2546
2547         i = 0;
2548         foreach(indexoidscan, indexoidlist)
2549         {
2550                 Oid                     indexoid = lfirsti(indexoidscan);
2551
2552                 (*Irel)[i] = index_open(indexoid);
2553                 i++;
2554         }
2555
2556         freeList(indexoidlist);
2557 }
2558
2559
2560 void
2561 vac_close_indexes(int nindexes, Relation *Irel)
2562 {
2563         if (Irel == (Relation *) NULL)
2564                 return;
2565
2566         while (nindexes--)
2567                 index_close(Irel[nindexes]);
2568         pfree(Irel);
2569 }
2570
2571
2572 /*
2573  * Is an index partial (ie, could it contain fewer tuples than the heap?)
2574  */
2575 bool
2576 vac_is_partial_index(Relation indrel)
2577 {
2578         bool            result;
2579         HeapTuple       cachetuple;
2580         Form_pg_index indexStruct;
2581
2582         /*
2583          * If the index's AM doesn't support nulls, it's partial for our purposes
2584          */
2585         if (! indrel->rd_am->amindexnulls)
2586                 return true;
2587
2588         /* Otherwise, look to see if there's a partial-index predicate */
2589         cachetuple = SearchSysCache(INDEXRELID,
2590                                                                 ObjectIdGetDatum(RelationGetRelid(indrel)),
2591                                                                 0, 0, 0);
2592         if (!HeapTupleIsValid(cachetuple))
2593                 elog(ERROR, "vac_is_partial_index: index %u not found",
2594                          RelationGetRelid(indrel));
2595         indexStruct = (Form_pg_index) GETSTRUCT(cachetuple);
2596
2597         result = (VARSIZE(&indexStruct->indpred) > VARHDRSZ);
2598
2599         ReleaseSysCache(cachetuple);
2600         return result;
2601 }
2602
2603
2604 static bool
2605 enough_space(VacPage vacpage, Size len)
2606 {
2607         len = MAXALIGN(len);
2608
2609         if (len > vacpage->free)
2610                 return false;
2611
2612         /* if there are free itemid(s) and len <= free_space... */
2613         if (vacpage->offsets_used < vacpage->offsets_free)
2614                 return true;
2615
2616         /* noff_used >= noff_free and so we'll have to allocate new itemid */
2617         if (len + sizeof(ItemIdData) <= vacpage->free)
2618                 return true;
2619
2620         return false;
2621 }
2622
2623
2624 /*
2625  * Initialize usage snapshot.
2626  */
2627 void
2628 vac_init_rusage(VacRUsage *ru0)
2629 {
2630         struct timezone tz;
2631
2632         getrusage(RUSAGE_SELF, &ru0->ru);
2633         gettimeofday(&ru0->tv, &tz);
2634 }
2635
2636 /*
2637  * Compute elapsed time since ru0 usage snapshot, and format into
2638  * a displayable string.  Result is in a static string, which is
2639  * tacky, but no one ever claimed that the Postgres backend is
2640  * threadable...
2641  */
2642 const char *
2643 vac_show_rusage(VacRUsage *ru0)
2644 {
2645         static char result[100];
2646         VacRUsage       ru1;
2647
2648         vac_init_rusage(&ru1);
2649
2650         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
2651         {
2652                 ru1.tv.tv_sec--;
2653                 ru1.tv.tv_usec += 1000000;
2654         }
2655         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
2656         {
2657                 ru1.ru.ru_stime.tv_sec--;
2658                 ru1.ru.ru_stime.tv_usec += 1000000;
2659         }
2660         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
2661         {
2662                 ru1.ru.ru_utime.tv_sec--;
2663                 ru1.ru.ru_utime.tv_usec += 1000000;
2664         }
2665
2666         snprintf(result, sizeof(result),
2667                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
2668                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
2669                          (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
2670                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
2671                          (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
2672                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
2673                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
2674
2675         return result;
2676 }