]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Oops, only wanted python change in the last commit. Backing out.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        the postgres vacuum cleaner
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.195 2001/05/25 15:45:32 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include <fcntl.h>
18 #include <unistd.h>
19 #include <time.h>
20 #include <sys/time.h>
21 #include <sys/types.h>
22 #include <sys/file.h>
23 #include <sys/stat.h>
24
25 #ifndef HAVE_GETRUSAGE
26 #include "rusagestub.h"
27 #else
28 #include <sys/resource.h>
29 #endif
30
31 #include "access/genam.h"
32 #include "access/heapam.h"
33 #include "access/xlog.h"
34 #include "catalog/catalog.h"
35 #include "catalog/catname.h"
36 #include "catalog/index.h"
37 #include "commands/vacuum.h"
38 #include "miscadmin.h"
39 #include "nodes/execnodes.h"
40 #include "storage/sinval.h"
41 #include "storage/smgr.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/acl.h"
44 #include "utils/builtins.h"
45 #include "utils/fmgroids.h"
46 #include "utils/inval.h"
47 #include "utils/relcache.h"
48 #include "utils/syscache.h"
49 #include "utils/temprel.h"
50
51 extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
52                            char *unused, int unlen);
53 extern XLogRecPtr log_heap_move(Relation reln,
54                           Buffer oldbuf, ItemPointerData from,
55                           Buffer newbuf, HeapTuple newtup);
56
57
58 typedef struct VRelListData
59 {
60         Oid                     vrl_relid;
61         struct VRelListData *vrl_next;
62 } VRelListData;
63
64 typedef VRelListData *VRelList;
65
66 typedef struct VacPageData
67 {
68         BlockNumber blkno;                      /* BlockNumber of this Page */
69         Size            free;                   /* FreeSpace on this Page */
70         uint16          offsets_used;   /* Number of OffNums used by vacuum */
71         uint16          offsets_free;   /* Number of OffNums free or to be free */
72         OffsetNumber offsets[1];        /* Array of its OffNums */
73 } VacPageData;
74
75 typedef VacPageData *VacPage;
76
77 typedef struct VacPageListData
78 {
79         int                     empty_end_pages;/* Number of "empty" end-pages */
80         int                     num_pages;              /* Number of pages in pagedesc */
81         int                     num_allocated_pages;    /* Number of allocated pages in
82                                                                                  * pagedesc */
83         VacPage    *pagedesc;           /* Descriptions of pages */
84 } VacPageListData;
85
86 typedef VacPageListData *VacPageList;
87
88 typedef struct VTupleLinkData
89 {
90         ItemPointerData new_tid;
91         ItemPointerData this_tid;
92 } VTupleLinkData;
93
94 typedef VTupleLinkData *VTupleLink;
95
96 typedef struct VTupleMoveData
97 {
98         ItemPointerData tid;            /* tuple ID */
99         VacPage         vacpage;                /* where to move */
100         bool            cleanVpd;               /* clean vacpage before using */
101 } VTupleMoveData;
102
103 typedef VTupleMoveData *VTupleMove;
104
105 typedef struct VRelStats
106 {
107         Oid                     relid;
108         long            num_pages;
109         long            num_tuples;
110         Size            min_tlen;
111         Size            max_tlen;
112         bool            hasindex;
113         int                     num_vtlinks;
114         VTupleLink      vtlinks;
115 } VRelStats;
116
117 typedef struct VacRUsage
118 {
119         struct timeval  tv;
120         struct rusage   ru;
121 } VacRUsage;
122
123 static MemoryContext vac_context = NULL;
124
125 static int      MESSAGE_LEVEL;          /* message level */
126
127 static TransactionId XmaxRecent;
128
129
130 /* non-export function prototypes */
131 static void vacuum_init(void);
132 static void vacuum_shutdown(void);
133 static VRelList getrels(Name VacRelP, const char *stmttype);
134 static void vacuum_rel(Oid relid);
135 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
136                                           VacPageList vacuum_pages, VacPageList fraged_pages);
137 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
138                                                 VacPageList vacuum_pages, VacPageList fraged_pages,
139                                                 int nindices, Relation *Irel);
140 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
141                                                 VacPageList vacpagelist);
142 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
143 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
144                                                  long num_tuples, int keep_tuples);
145 static void scan_index(Relation indrel, long num_tuples);
146 static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist);
147 static void reap_page(VacPageList vacpagelist, VacPage vacpage);
148 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
149 static void get_indices(Relation relation, int *nindices, Relation **Irel);
150 static void close_indices(int nindices, Relation *Irel);
151 static IndexInfo **get_index_desc(Relation onerel, int nindices,
152                            Relation *Irel);
153 static void *vac_bsearch(const void *key, const void *base,
154                                                  size_t nelem, size_t size,
155                                                  int (*compar) (const void *, const void *));
156 static int      vac_cmp_blk(const void *left, const void *right);
157 static int      vac_cmp_offno(const void *left, const void *right);
158 static int      vac_cmp_vtlinks(const void *left, const void *right);
159 static bool enough_space(VacPage vacpage, Size len);
160 static void init_rusage(VacRUsage *ru0);
161 static char *show_rusage(VacRUsage *ru0);
162
163
164 /*
165  * Primary entry point for VACUUM and ANALYZE commands.
166  */
167 void
168 vacuum(VacuumStmt *vacstmt)
169 {
170         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
171         NameData        VacRel;
172         Name            VacRelName;
173         VRelList        vrl,
174                                 cur;
175
176         /*
177          * We cannot run VACUUM inside a user transaction block; if we were
178          * inside a transaction, then our commit- and
179          * start-transaction-command calls would not have the intended effect!
180          * Furthermore, the forced commit that occurs before truncating the
181          * relation's file would have the effect of committing the rest of the
182          * user's transaction too, which would certainly not be the desired
183          * behavior.
184          */
185         if (IsTransactionBlock())
186                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
187
188         if (vacstmt->verbose)
189                 MESSAGE_LEVEL = NOTICE;
190         else
191                 MESSAGE_LEVEL = DEBUG;
192
193         /*
194          * Create special memory context for cross-transaction storage.
195          *
196          * Since it is a child of QueryContext, it will go away eventually even
197          * if we suffer an error; there's no need for special abort cleanup
198          * logic.
199          */
200         vac_context = AllocSetContextCreate(QueryContext,
201                                                                                 "Vacuum",
202                                                                                 ALLOCSET_DEFAULT_MINSIZE,
203                                                                                 ALLOCSET_DEFAULT_INITSIZE,
204                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
205
206         /* Convert vacrel, which is just a string, to a Name */
207         if (vacstmt->vacrel)
208         {
209                 namestrcpy(&VacRel, vacstmt->vacrel);
210                 VacRelName = &VacRel;
211         }
212         else
213                 VacRelName = NULL;
214
215         /* Build list of relations to process (note this lives in vac_context) */
216         vrl = getrels(VacRelName, stmttype);
217
218         /*
219          * Start up the vacuum cleaner.
220          */
221         vacuum_init();
222
223         /*
224          * Process each selected relation.  We are careful to process
225          * each relation in a separate transaction in order to avoid holding
226          * too many locks at one time.
227          */
228         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
229         {
230                 if (vacstmt->vacuum)
231                         vacuum_rel(cur->vrl_relid);
232                 /* analyze separately so locking is minimized */
233                 if (vacstmt->analyze)
234                         analyze_rel(cur->vrl_relid, vacstmt);
235         }
236
237         /* clean up */
238         vacuum_shutdown();
239 }
240
241 /*
242  *      vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
243  *
244  *              Formerly, there was code here to prevent more than one VACUUM from
245  *              executing concurrently in the same database.  However, there's no
246  *              good reason to prevent that, and manually removing lockfiles after
247  *              a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
248  *              and just rely on the exclusive lock we grab on each target table
249  *              to ensure that there aren't two VACUUMs running on the same table
250  *              at the same time.
251  *
252  *              The strangeness with committing and starting transactions in the
253  *              init and shutdown routines is due to the fact that the vacuum cleaner
254  *              is invoked via an SQL command, and so is already executing inside
255  *              a transaction.  We need to leave ourselves in a predictable state
256  *              on entry and exit to the vacuum cleaner.  We commit the transaction
257  *              started in PostgresMain() inside vacuum_init(), and start one in
258  *              vacuum_shutdown() to match the commit waiting for us back in
259  *              PostgresMain().
260  */
261 static void
262 vacuum_init(void)
263 {
264         /* matches the StartTransaction in PostgresMain() */
265         CommitTransactionCommand();
266 }
267
268 static void
269 vacuum_shutdown(void)
270 {
271         /* on entry, we are not in a transaction */
272
273         /*
274          * Flush the init file that relcache.c uses to save startup time. The
275          * next backend startup will rebuild the init file with up-to-date
276          * information from pg_class.  This lets the optimizer see the stats
277          * that we've collected for certain critical system indexes.  See
278          * relcache.c for more details.
279          *
280          * Ignore any failure to unlink the file, since it might not be there if
281          * no backend has been started since the last vacuum...
282          */
283         unlink(RELCACHE_INIT_FILENAME);
284
285         /* matches the CommitTransaction in PostgresMain() */
286         StartTransactionCommand();
287
288         /*
289          * Clean up working storage --- note we must do this after
290          * StartTransactionCommand, else we might be trying to delete the
291          * active context!
292          */
293         MemoryContextDelete(vac_context);
294         vac_context = NULL;
295 }
296
297 /*
298  * Build a list of VRelListData nodes for each relation to be processed
299  */
300 static VRelList
301 getrels(Name VacRelP, const char *stmttype)
302 {
303         Relation        rel;
304         TupleDesc       tupdesc;
305         HeapScanDesc scan;
306         HeapTuple       tuple;
307         VRelList        vrl,
308                                 cur;
309         Datum           d;
310         char       *rname;
311         char            rkind;
312         bool            n;
313         ScanKeyData key;
314
315         if (VacRelP)
316         {
317
318                 /*
319                  * we could use the cache here, but it is clearer to use scankeys
320                  * for both vacuum cases, bjm 2000/01/19
321                  */
322                 char       *nontemp_relname;
323
324                 /* We must re-map temp table names bjm 2000-04-06 */
325                 nontemp_relname = get_temp_rel_by_username(NameStr(*VacRelP));
326                 if (nontemp_relname == NULL)
327                         nontemp_relname = NameStr(*VacRelP);
328
329                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
330                                                            F_NAMEEQ,
331                                                            PointerGetDatum(nontemp_relname));
332         }
333         else
334         {
335                 /* find all relations listed in pg_class */
336                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
337                                                            F_CHAREQ, CharGetDatum('r'));
338         }
339
340         vrl = cur = (VRelList) NULL;
341
342         rel = heap_openr(RelationRelationName, AccessShareLock);
343         tupdesc = RelationGetDescr(rel);
344
345         scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
346
347         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
348         {
349                 d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
350                 rname = (char *) DatumGetName(d);
351
352                 d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
353                 rkind = DatumGetChar(d);
354
355                 if (rkind != RELKIND_RELATION)
356                 {
357                         elog(NOTICE, "%s: can not process indexes, views or special system tables",
358                                  stmttype);
359                         continue;
360                 }
361
362                 /* Make a relation list entry for this guy */
363                 if (vrl == (VRelList) NULL)
364                         vrl = cur = (VRelList)
365                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
366                 else
367                 {
368                         cur->vrl_next = (VRelList)
369                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
370                         cur = cur->vrl_next;
371                 }
372
373                 cur->vrl_relid = tuple->t_data->t_oid;
374                 cur->vrl_next = (VRelList) NULL;
375         }
376
377         heap_endscan(scan);
378         heap_close(rel, AccessShareLock);
379
380         if (vrl == NULL)
381                 elog(NOTICE, "%s: table not found", stmttype);
382
383         return vrl;
384 }
385
386 /*
387  *      vacuum_rel() -- vacuum one heap relation
388  *
389  *              This routine vacuums a single heap, cleans out its indices, and
390  *              updates its num_pages and num_tuples statistics.
391  *
392  *              Doing one heap at a time incurs extra overhead, since we need to
393  *              check that the heap exists again just before we vacuum it.      The
394  *              reason that we do this is so that vacuuming can be spread across
395  *              many small transactions.  Otherwise, two-phase locking would require
396  *              us to lock the entire database during one pass of the vacuum cleaner.
397  *
398  *              At entry and exit, we are not inside a transaction.
399  */
400 static void
401 vacuum_rel(Oid relid)
402 {
403         Relation        onerel;
404         LockRelId       onerelid;
405         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
406                                                                                  * clean indices */
407         VacPageListData fraged_pages;           /* List of pages with space enough
408                                                                                  * for re-using */
409         Relation   *Irel;
410         int32           nindices,
411                                 i;
412         VRelStats  *vacrelstats;
413         bool            reindex = false;
414         Oid                     toast_relid;
415
416         /* Begin a transaction for vacuuming this relation */
417         StartTransactionCommand();
418
419         /*
420          * Check for user-requested abort.      Note we want this to be inside a
421          * transaction, so xact.c doesn't issue useless NOTICE.
422          */
423         CHECK_FOR_INTERRUPTS();
424
425         /*
426          * Race condition -- if the pg_class tuple has gone away since the
427          * last time we saw it, we don't need to vacuum it.
428          */
429         if (!SearchSysCacheExists(RELOID,
430                                                           ObjectIdGetDatum(relid),
431                                                           0, 0, 0))
432         {
433                 CommitTransactionCommand();
434                 return;
435         }
436
437         /*
438          * Open the class, get an exclusive lock on it, and check permissions.
439          *
440          * Note we choose to treat permissions failure as a NOTICE and keep
441          * trying to vacuum the rest of the DB --- is this appropriate?
442          */
443         onerel = heap_open(relid, AccessExclusiveLock);
444
445         if (!pg_ownercheck(GetUserId(), RelationGetRelationName(onerel),
446                                            RELNAME))
447         {
448                 elog(NOTICE, "Skipping \"%s\" --- only table owner can VACUUM it",
449                          RelationGetRelationName(onerel));
450                 heap_close(onerel, AccessExclusiveLock);
451                 CommitTransactionCommand();
452                 return;
453         }
454
455         /*
456          * Get a session-level exclusive lock too.      This will protect our
457          * exclusive access to the relation across multiple transactions, so
458          * that we can vacuum the relation's TOAST table (if any) secure in
459          * the knowledge that no one is diddling the parent relation.
460          *
461          * NOTE: this cannot block, even if someone else is waiting for access,
462          * because the lock manager knows that both lock requests are from the
463          * same process.
464          */
465         onerelid = onerel->rd_lockInfo.lockRelId;
466         LockRelationForSession(&onerelid, AccessExclusiveLock);
467
468         /*
469          * Remember the relation's TOAST relation for later
470          */
471         toast_relid = onerel->rd_rel->reltoastrelid;
472
473         /*
474          * Set up statistics-gathering machinery.
475          */
476         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
477         vacrelstats->relid = relid;
478         vacrelstats->num_pages = 0;
479         vacrelstats->num_tuples = 0;
480         vacrelstats->hasindex = false;
481
482         GetXmaxRecent(&XmaxRecent);
483
484         /* scan it */
485         reindex = false;
486         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
487         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
488         if (IsIgnoringSystemIndexes() &&
489                 IsSystemRelationName(RelationGetRelationName(onerel)))
490                 reindex = true;
491
492         /* Now open indices */
493         nindices = 0;
494         Irel = (Relation *) NULL;
495         get_indices(onerel, &nindices, &Irel);
496         if (!Irel)
497                 reindex = false;
498         else if (!RelationGetForm(onerel)->relhasindex)
499                 reindex = true;
500         if (nindices > 0)
501                 vacrelstats->hasindex = true;
502         else
503                 vacrelstats->hasindex = false;
504
505 #ifdef NOT_USED
506         /*
507          * reindex in VACUUM is dangerous under WAL. ifdef out until it
508          * becomes safe.
509          */
510         if (reindex)
511         {
512                 for (i = 0; i < nindices; i++)
513                         index_close(Irel[i]);
514                 Irel = (Relation *) NULL;
515                 activate_indexes_of_a_table(relid, false);
516         }
517 #endif   /* NOT_USED */
518
519         /* Clean/scan index relation(s) */
520         if (Irel != (Relation *) NULL)
521         {
522                 if (vacuum_pages.num_pages > 0)
523                 {
524                         for (i = 0; i < nindices; i++)
525                                 vacuum_index(&vacuum_pages, Irel[i],
526                                                          vacrelstats->num_tuples, 0);
527                 }
528                 else
529                 {
530                         /* just scan indices to update statistic */
531                         for (i = 0; i < nindices; i++)
532                                 scan_index(Irel[i], vacrelstats->num_tuples);
533                 }
534         }
535
536         if (fraged_pages.num_pages > 0)
537         {
538                 /* Try to shrink heap */
539                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
540                                         nindices, Irel);
541         }
542         else
543         {
544                 if (Irel != (Relation *) NULL)
545                         close_indices(nindices, Irel);
546                 if (vacuum_pages.num_pages > 0)
547                 {
548                         /* Clean pages from vacuum_pages list */
549                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
550                 }
551                 else
552                 {
553
554                         /*
555                          * Flush dirty pages out to disk.  We must do this even if we
556                          * didn't do anything else, because we want to ensure that all
557                          * tuples have correct on-row commit status on disk (see
558                          * bufmgr.c's comments for FlushRelationBuffers()).
559                          */
560                         i = FlushRelationBuffers(onerel, vacrelstats->num_pages);
561                         if (i < 0)
562                                 elog(ERROR, "VACUUM (vacuum_rel): FlushRelationBuffers returned %d",
563                                          i);
564                 }
565         }
566 #ifdef NOT_USED
567         if (reindex)
568                 activate_indexes_of_a_table(relid, true);
569 #endif   /* NOT_USED */
570
571         /* all done with this class, but hold lock until commit */
572         heap_close(onerel, NoLock);
573
574         /* update statistics in pg_class */
575         vac_update_relstats(vacrelstats->relid, vacrelstats->num_pages,
576                                                 vacrelstats->num_tuples, vacrelstats->hasindex);
577
578         /*
579          * Complete the transaction and free all temporary memory used.
580          */
581         CommitTransactionCommand();
582
583         /*
584          * If the relation has a secondary toast one, vacuum that too while we
585          * still hold the session lock on the master table. We don't need to
586          * propagate "analyze" to it, because the toaster always uses
587          * hardcoded index access and statistics are totally unimportant for
588          * toast relations
589          */
590         if (toast_relid != InvalidOid)
591                 vacuum_rel(toast_relid);
592
593         /*
594          * Now release the session-level lock on the master table.
595          */
596         UnlockRelationForSession(&onerelid, AccessExclusiveLock);
597 }
598
599 /*
600  *      scan_heap() -- scan an open heap relation
601  *
602  *              This routine sets commit times, constructs vacuum_pages list of
603  *              empty/uninitialized pages and pages with dead tuples and
604  *              ~LP_USED line pointers, constructs fraged_pages list of pages
605  *              appropriate for purposes of shrinking and maintains statistics
606  *              on the number of live tuples in a heap.
607  */
608 static void
609 scan_heap(VRelStats *vacrelstats, Relation onerel,
610                   VacPageList vacuum_pages, VacPageList fraged_pages)
611 {
612         BlockNumber nblocks,
613                                 blkno;
614         ItemId          itemid;
615         Buffer          buf;
616         HeapTupleData tuple;
617         Page            page,
618                                 tempPage = NULL;
619         OffsetNumber offnum,
620                                 maxoff;
621         bool            pgchanged,
622                                 tupgone,
623                                 dobufrel,
624                                 notup;
625         char       *relname;
626         VacPage         vacpage,
627                                 vp;
628         long            num_tuples;
629         uint32          tups_vacuumed,
630                                 nkeep,
631                                 nunused,
632                                 ncrash,
633                                 empty_pages,
634                                 new_pages,
635                                 changed_pages,
636                                 empty_end_pages;
637         Size            free_size,
638                                 usable_free_size;
639         Size            min_tlen = MaxTupleSize;
640         Size            max_tlen = 0;
641         int32           i;
642         bool            do_shrinking = true;
643         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
644         int                     num_vtlinks = 0;
645         int                     free_vtlinks = 100;
646         VacRUsage       ru0;
647
648         init_rusage(&ru0);
649
650         relname = RelationGetRelationName(onerel);
651         elog(MESSAGE_LEVEL, "--Relation %s--", relname);
652
653         tups_vacuumed = num_tuples = nkeep = nunused = ncrash = empty_pages =
654                 new_pages = changed_pages = empty_end_pages = 0;
655         free_size = usable_free_size = 0;
656
657         nblocks = RelationGetNumberOfBlocks(onerel);
658
659         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
660         vacpage->offsets_used = 0;
661
662         for (blkno = 0; blkno < nblocks; blkno++)
663         {
664                 buf = ReadBuffer(onerel, blkno);
665                 page = BufferGetPage(buf);
666                 vacpage->blkno = blkno;
667                 vacpage->offsets_free = 0;
668
669                 if (PageIsNew(page))
670                 {
671                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
672                                  relname, blkno);
673                         PageInit(page, BufferGetPageSize(buf), 0);
674                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
675                         free_size += (vacpage->free - sizeof(ItemIdData));
676                         new_pages++;
677                         empty_end_pages++;
678                         reap_page(vacuum_pages, vacpage);
679                         WriteBuffer(buf);
680                         continue;
681                 }
682
683                 if (PageIsEmpty(page))
684                 {
685                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
686                         free_size += (vacpage->free - sizeof(ItemIdData));
687                         empty_pages++;
688                         empty_end_pages++;
689                         reap_page(vacuum_pages, vacpage);
690                         ReleaseBuffer(buf);
691                         continue;
692                 }
693
694                 pgchanged = false;
695                 notup = true;
696                 maxoff = PageGetMaxOffsetNumber(page);
697                 for (offnum = FirstOffsetNumber;
698                          offnum <= maxoff;
699                          offnum = OffsetNumberNext(offnum))
700                 {
701                         itemid = PageGetItemId(page, offnum);
702
703                         /*
704                          * Collect un-used items too - it's possible to have indices
705                          * pointing here after crash.
706                          */
707                         if (!ItemIdIsUsed(itemid))
708                         {
709                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
710                                 nunused++;
711                                 continue;
712                         }
713
714                         tuple.t_datamcxt = NULL;
715                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
716                         tuple.t_len = ItemIdGetLength(itemid);
717                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
718                         tupgone = false;
719
720                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
721                         {
722                                 if (tuple.t_data->t_infomask & HEAP_XMIN_INVALID)
723                                         tupgone = true;
724                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
725                                 {
726                                         if (TransactionIdDidCommit((TransactionId)
727                                                                                            tuple.t_data->t_cmin))
728                                         {
729                                                 tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
730                                                 pgchanged = true;
731                                                 tupgone = true;
732                                         }
733                                         else
734                                         {
735                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
736                                                 pgchanged = true;
737                                         }
738                                 }
739                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
740                                 {
741                                         if (!TransactionIdDidCommit((TransactionId)
742                                                                                                 tuple.t_data->t_cmin))
743                                         {
744                                                 tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
745                                                 pgchanged = true;
746                                                 tupgone = true;
747                                         }
748                                         else
749                                         {
750                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
751                                                 pgchanged = true;
752                                         }
753                                 }
754                                 else
755                                 {
756                                         if (TransactionIdDidAbort(tuple.t_data->t_xmin))
757                                                 tupgone = true;
758                                         else if (TransactionIdDidCommit(tuple.t_data->t_xmin))
759                                         {
760                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
761                                                 pgchanged = true;
762                                         }
763                                         else if (!TransactionIdIsInProgress(tuple.t_data->t_xmin))
764                                         {
765                                                 /*
766                                                  * Not Aborted, Not Committed, Not in Progress -
767                                                  * so it's from crashed process. - vadim 11/26/96
768                                                  */
769                                                 ncrash++;
770                                                 tupgone = true;
771                                         }
772                                         else
773                                         {
774                                                 elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
775                                                    relname, blkno, offnum, tuple.t_data->t_xmin);
776                                                 do_shrinking = false;
777                                         }
778                                 }
779                         }
780
781                         /*
782                          * here we are concerned about tuples with xmin committed and
783                          * xmax unknown or committed
784                          */
785                         if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED &&
786                                 !(tuple.t_data->t_infomask & HEAP_XMAX_INVALID))
787                         {
788                                 if (tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED)
789                                 {
790                                         if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
791                                         {
792                                                 tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
793                                                 tuple.t_data->t_infomask &=
794                                                         ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
795                                                 pgchanged = true;
796                                         }
797                                         else
798                                                 tupgone = true;
799                                 }
800                                 else if (TransactionIdDidAbort(tuple.t_data->t_xmax))
801                                 {
802                                         tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
803                                         pgchanged = true;
804                                 }
805                                 else if (TransactionIdDidCommit(tuple.t_data->t_xmax))
806                                 {
807                                         if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
808                                         {
809                                                 tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
810                                                 tuple.t_data->t_infomask &=
811                                                         ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
812                                                 pgchanged = true;
813                                         }
814                                         else
815                                                 tupgone = true;
816                                 }
817                                 else if (!TransactionIdIsInProgress(tuple.t_data->t_xmax))
818                                 {
819                                         /*
820                                          * Not Aborted, Not Committed, Not in Progress - so it
821                                          * from crashed process. - vadim 06/02/97
822                                          */
823                                         tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
824                                         tuple.t_data->t_infomask &=
825                                                 ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
826                                         pgchanged = true;
827                                 }
828                                 else
829                                 {
830                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
831                                                  relname, blkno, offnum, tuple.t_data->t_xmax);
832                                         do_shrinking = false;
833                                 }
834
835                                 /*
836                                  * If tuple is recently deleted then we must not remove it
837                                  * from relation.
838                                  */
839                                 if (tupgone &&
840                                         (tuple.t_data->t_infomask & HEAP_XMIN_INVALID) == 0 &&
841                                         tuple.t_data->t_xmax >= XmaxRecent)
842                                 {
843                                         tupgone = false;
844                                         nkeep++;
845                                         if (!(tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED))
846                                         {
847                                                 tuple.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
848                                                 pgchanged = true;
849                                         }
850
851                                         /*
852                                          * If we do shrinking and this tuple is updated one
853                                          * then remember it to construct updated tuple
854                                          * dependencies.
855                                          */
856                                         if (do_shrinking && !(ItemPointerEquals(&(tuple.t_self),
857                                                                                            &(tuple.t_data->t_ctid))))
858                                         {
859                                                 if (free_vtlinks == 0)
860                                                 {
861                                                         free_vtlinks = 1000;
862                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
863                                                                                    (free_vtlinks + num_vtlinks) *
864                                                                                                  sizeof(VTupleLinkData));
865                                                 }
866                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
867                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
868                                                 free_vtlinks--;
869                                                 num_vtlinks++;
870                                         }
871                                 }
872                         }
873
874                         /*
875                          * Other checks...
876                          */
877                         if (!OidIsValid(tuple.t_data->t_oid))
878                         {
879                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
880                                          relname, blkno, offnum, tupgone);
881                         }
882
883                         if (tupgone)
884                         {
885                                 ItemId          lpp;
886
887                                 /*
888                                  * Here we are building a temporary copy of the page with
889                                  * dead tuples removed.  Below we will apply
890                                  * PageRepairFragmentation to the copy, so that we can
891                                  * determine how much space will be available after
892                                  * removal of dead tuples.      But note we are NOT changing
893                                  * the real page yet...
894                                  */
895                                 if (tempPage == (Page) NULL)
896                                 {
897                                         Size            pageSize;
898
899                                         pageSize = PageGetPageSize(page);
900                                         tempPage = (Page) palloc(pageSize);
901                                         memmove(tempPage, page, pageSize);
902                                 }
903
904                                 /* mark it unused on the temp page */
905                                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
906                                 lpp->lp_flags &= ~LP_USED;
907
908                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
909                                 tups_vacuumed++;
910                         }
911                         else
912                         {
913                                 num_tuples++;
914                                 notup = false;
915                                 if (tuple.t_len < min_tlen)
916                                         min_tlen = tuple.t_len;
917                                 if (tuple.t_len > max_tlen)
918                                         max_tlen = tuple.t_len;
919                         }
920                 }
921
922                 if (pgchanged)
923                 {
924                         WriteBuffer(buf);
925                         dobufrel = false;
926                         changed_pages++;
927                 }
928                 else
929                         dobufrel = true;
930
931                 if (tempPage != (Page) NULL)
932                 {                                               /* Some tuples are gone */
933                         PageRepairFragmentation(tempPage, NULL);
934                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
935                         free_size += vacpage->free;
936                         reap_page(vacuum_pages, vacpage);
937                         pfree(tempPage);
938                         tempPage = (Page) NULL;
939                 }
940                 else if (vacpage->offsets_free > 0)
941                 {                                               /* there are only ~LP_USED line pointers */
942                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
943                         free_size += vacpage->free;
944                         reap_page(vacuum_pages, vacpage);
945                 }
946                 if (dobufrel)
947                         ReleaseBuffer(buf);
948                 if (notup)
949                         empty_end_pages++;
950                 else
951                         empty_end_pages = 0;
952         }
953
954         pfree(vacpage);
955
956         /* save stats in the rel list for use later */
957         vacrelstats->num_tuples = num_tuples;
958         vacrelstats->num_pages = nblocks;
959         if (num_tuples == 0)
960                 min_tlen = max_tlen = 0;
961         vacrelstats->min_tlen = min_tlen;
962         vacrelstats->max_tlen = max_tlen;
963
964         vacuum_pages->empty_end_pages = empty_end_pages;
965         fraged_pages->empty_end_pages = empty_end_pages;
966
967         /*
968          * Try to make fraged_pages keeping in mind that we can't use free
969          * space of "empty" end-pages and last page if it reaped.
970          */
971         if (do_shrinking && vacuum_pages->num_pages - empty_end_pages > 0)
972         {
973                 int                     nusf;           /* blocks usefull for re-using */
974
975                 nusf = vacuum_pages->num_pages - empty_end_pages;
976                 if ((vacuum_pages->pagedesc[nusf - 1])->blkno == nblocks - empty_end_pages - 1)
977                         nusf--;
978
979                 for (i = 0; i < nusf; i++)
980                 {
981                         vp = vacuum_pages->pagedesc[i];
982                         if (enough_space(vp, min_tlen))
983                         {
984                                 vpage_insert(fraged_pages, vp);
985                                 usable_free_size += vp->free;
986                         }
987                 }
988         }
989
990         if (usable_free_size > 0 && num_vtlinks > 0)
991         {
992                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
993                           vac_cmp_vtlinks);
994                 vacrelstats->vtlinks = vtlinks;
995                 vacrelstats->num_vtlinks = num_vtlinks;
996         }
997         else
998         {
999                 vacrelstats->vtlinks = NULL;
1000                 vacrelstats->num_vtlinks = 0;
1001                 pfree(vtlinks);
1002         }
1003
1004         elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
1005 Tup %lu: Vac %u, Keep/VTL %u/%u, Crash %u, UnUsed %u, MinLen %lu, MaxLen %lu; \
1006 Re-using: Free/Avail. Space %lu/%lu; EndEmpty/Avail. Pages %u/%u. %s",
1007                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1008                  new_pages, num_tuples, tups_vacuumed,
1009                  nkeep, vacrelstats->num_vtlinks, ncrash,
1010                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1011                  (unsigned long) free_size, (unsigned long) usable_free_size,
1012                  empty_end_pages, fraged_pages->num_pages,
1013                  show_rusage(&ru0));
1014
1015 }
1016
1017
1018 /*
1019  *      repair_frag() -- try to repair relation's fragmentation
1020  *
1021  *              This routine marks dead tuples as unused and tries re-use dead space
1022  *              by moving tuples (and inserting indices if needed). It constructs
1023  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indices
1024  *              for them after committing (in hack-manner - without losing locks
1025  *              and freeing memory!) current transaction. It truncates relation
1026  *              if some end-blocks are gone away.
1027  */
1028 static void
1029 repair_frag(VRelStats *vacrelstats, Relation onerel,
1030                         VacPageList vacuum_pages, VacPageList fraged_pages,
1031                         int nindices, Relation *Irel)
1032 {
1033         TransactionId myXID;
1034         CommandId       myCID;
1035         Buffer          buf,
1036                                 cur_buffer;
1037         int                     nblocks,
1038                                 blkno;
1039         Page            page,
1040                                 ToPage = NULL;
1041         OffsetNumber offnum,
1042                                 maxoff,
1043                                 newoff,
1044                                 max_offset;
1045         ItemId          itemid,
1046                                 newitemid;
1047         HeapTupleData tuple,
1048                                 newtup;
1049         TupleDesc       tupdesc;
1050         IndexInfo **indexInfo = NULL;
1051         Datum           idatum[INDEX_MAX_KEYS];
1052         char            inulls[INDEX_MAX_KEYS];
1053         InsertIndexResult iresult;
1054         VacPageListData Nvacpagelist;
1055         VacPage         cur_page = NULL,
1056                                 last_vacuum_page,
1057                                 vacpage,
1058                            *curpage;
1059         int                     cur_item = 0;
1060         int                     last_move_dest_block = -1,
1061                                 last_vacuum_block,
1062                                 i = 0;
1063         Size            tuple_len;
1064         int                     num_moved,
1065                                 num_fraged_pages,
1066                                 vacuumed_pages;
1067         int                     checked_moved,
1068                                 num_tuples,
1069                                 keep_tuples = 0;
1070         bool            isempty,
1071                                 dowrite,
1072                                 chain_tuple_moved;
1073         VacRUsage       ru0;
1074
1075         init_rusage(&ru0);
1076
1077         myXID = GetCurrentTransactionId();
1078         myCID = GetCurrentCommandId();
1079
1080         tupdesc = RelationGetDescr(onerel);
1081
1082         if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
1083                 indexInfo = get_index_desc(onerel, nindices, Irel);
1084
1085         Nvacpagelist.num_pages = 0;
1086         num_fraged_pages = fraged_pages->num_pages;
1087         Assert(vacuum_pages->num_pages > vacuum_pages->empty_end_pages);
1088         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1089         last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1090         last_vacuum_block = last_vacuum_page->blkno;
1091         cur_buffer = InvalidBuffer;
1092         num_moved = 0;
1093
1094         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1095         vacpage->offsets_used = vacpage->offsets_free = 0;
1096
1097         /*
1098          * Scan pages backwards from the last nonempty page, trying to move
1099          * tuples down to lower pages.  Quit when we reach a page that we have
1100          * moved any tuples onto.  Note that if a page is still in the
1101          * fraged_pages list (list of candidate move-target pages) when we
1102          * reach it, we will remove it from the list.  This ensures we never
1103          * move a tuple up to a higher page number.
1104          *
1105          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1106          * in order, and on fraged_pages being a subset of vacuum_pages.
1107          */
1108         nblocks = vacrelstats->num_pages;
1109         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1110                  blkno > last_move_dest_block;
1111                  blkno--)
1112         {
1113                 buf = ReadBuffer(onerel, blkno);
1114                 page = BufferGetPage(buf);
1115
1116                 vacpage->offsets_free = 0;
1117
1118                 isempty = PageIsEmpty(page);
1119
1120                 dowrite = false;
1121                 if (blkno == last_vacuum_block) /* it's reaped page */
1122                 {
1123                         if (last_vacuum_page->offsets_free > 0)         /* there are dead tuples */
1124                         {                                       /* on this page - clean */
1125                                 Assert(!isempty);
1126                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1127                                 vacuum_page(onerel, buf, last_vacuum_page);
1128                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1129                                 dowrite = true;
1130                         }
1131                         else
1132                                 Assert(isempty);
1133                         --vacuumed_pages;
1134                         if (vacuumed_pages > 0)
1135                         {
1136                                 /* get prev reaped page from vacuum_pages */
1137                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1138                                 last_vacuum_block = last_vacuum_page->blkno;
1139                         }
1140                         else
1141                         {
1142                                 last_vacuum_page = NULL;
1143                                 last_vacuum_block = -1;
1144                         }
1145                         if (num_fraged_pages > 0 &&
1146                                 fraged_pages->pagedesc[num_fraged_pages - 1]->blkno ==
1147                                 (BlockNumber) blkno)
1148                         {
1149                                 /* page is in fraged_pages too; remove it */
1150                                 --num_fraged_pages;
1151                         }
1152                         if (isempty)
1153                         {
1154                                 ReleaseBuffer(buf);
1155                                 continue;
1156                         }
1157                 }
1158                 else
1159                         Assert(!isempty);
1160
1161                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1162                                                                                  * off this page, yet */
1163                 vacpage->blkno = blkno;
1164                 maxoff = PageGetMaxOffsetNumber(page);
1165                 for (offnum = FirstOffsetNumber;
1166                          offnum <= maxoff;
1167                          offnum = OffsetNumberNext(offnum))
1168                 {
1169                         itemid = PageGetItemId(page, offnum);
1170
1171                         if (!ItemIdIsUsed(itemid))
1172                                 continue;
1173
1174                         tuple.t_datamcxt = NULL;
1175                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1176                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1177                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1178
1179                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1180                         {
1181                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1182                                         elog(ERROR, "Invalid XID in t_cmin");
1183                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1184                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1185
1186                                 /*
1187                                  * If this (chain) tuple is moved by me already then I
1188                                  * have to check is it in vacpage or not - i.e. is it
1189                                  * moved while cleaning this page or some previous one.
1190                                  */
1191                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1192                                 {
1193                                         if (keep_tuples == 0)
1194                                                 continue;
1195                                         if (chain_tuple_moved)          /* some chains was moved
1196                                                                                                  * while */
1197                                         {                       /* cleaning this page */
1198                                                 Assert(vacpage->offsets_free > 0);
1199                                                 for (i = 0; i < vacpage->offsets_free; i++)
1200                                                 {
1201                                                         if (vacpage->offsets[i] == offnum)
1202                                                                 break;
1203                                                 }
1204                                                 if (i >= vacpage->offsets_free) /* not found */
1205                                                 {
1206                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1207                                                         keep_tuples--;
1208                                                 }
1209                                         }
1210                                         else
1211                                         {
1212                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1213                                                 keep_tuples--;
1214                                         }
1215                                         continue;
1216                                 }
1217                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1218                         }
1219
1220                         /*
1221                          * If this tuple is in the chain of tuples created in updates
1222                          * by "recent" transactions then we have to move all chain of
1223                          * tuples to another places.
1224                          */
1225                         if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
1226                                  tuple.t_data->t_xmin >= XmaxRecent) ||
1227                                 (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1228                                  !(ItemPointerEquals(&(tuple.t_self), &(tuple.t_data->t_ctid)))))
1229                         {
1230                                 Buffer          Cbuf = buf;
1231                                 Page            Cpage;
1232                                 ItemId          Citemid;
1233                                 ItemPointerData Ctid;
1234                                 HeapTupleData tp = tuple;
1235                                 Size            tlen = tuple_len;
1236                                 VTupleMove      vtmove = (VTupleMove)
1237                                 palloc(100 * sizeof(VTupleMoveData));
1238                                 int                     num_vtmove = 0;
1239                                 int                     free_vtmove = 100;
1240                                 VacPage         to_vacpage = NULL;
1241                                 int                     to_item = 0;
1242                                 bool            freeCbuf = false;
1243                                 int                     ti;
1244
1245                                 if (vacrelstats->vtlinks == NULL)
1246                                         elog(ERROR, "No one parent tuple was found");
1247                                 if (cur_buffer != InvalidBuffer)
1248                                 {
1249                                         WriteBuffer(cur_buffer);
1250                                         cur_buffer = InvalidBuffer;
1251                                 }
1252
1253                                 /*
1254                                  * If this tuple is in the begin/middle of the chain then
1255                                  * we have to move to the end of chain.
1256                                  */
1257                                 while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1258                                 !(ItemPointerEquals(&(tp.t_self), &(tp.t_data->t_ctid))))
1259                                 {
1260                                         Ctid = tp.t_data->t_ctid;
1261                                         if (freeCbuf)
1262                                                 ReleaseBuffer(Cbuf);
1263                                         freeCbuf = true;
1264                                         Cbuf = ReadBuffer(onerel,
1265                                                                           ItemPointerGetBlockNumber(&Ctid));
1266                                         Cpage = BufferGetPage(Cbuf);
1267                                         Citemid = PageGetItemId(Cpage,
1268                                                                           ItemPointerGetOffsetNumber(&Ctid));
1269                                         if (!ItemIdIsUsed(Citemid))
1270                                         {
1271
1272                                                 /*
1273                                                  * This means that in the middle of chain there
1274                                                  * was tuple updated by older (than XmaxRecent)
1275                                                  * xaction and this tuple is already deleted by
1276                                                  * me. Actually, upper part of chain should be
1277                                                  * removed and seems that this should be handled
1278                                                  * in scan_heap(), but it's not implemented at the
1279                                                  * moment and so we just stop shrinking here.
1280                                                  */
1281                                                 ReleaseBuffer(Cbuf);
1282                                                 pfree(vtmove);
1283                                                 vtmove = NULL;
1284                                                 elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1285                                                 break;
1286                                         }
1287                                         tp.t_datamcxt = NULL;
1288                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1289                                         tp.t_self = Ctid;
1290                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1291                                 }
1292                                 if (vtmove == NULL)
1293                                         break;
1294                                 /* first, can chain be moved ? */
1295                                 for (;;)
1296                                 {
1297                                         if (to_vacpage == NULL ||
1298                                                 !enough_space(to_vacpage, tlen))
1299                                         {
1300
1301                                                 /*
1302                                                  * if to_vacpage no longer has enough free space
1303                                                  * to be useful, remove it from fraged_pages list
1304                                                  */
1305                                                 if (to_vacpage != NULL &&
1306                                                 !enough_space(to_vacpage, vacrelstats->min_tlen))
1307                                                 {
1308                                                         Assert(num_fraged_pages > to_item);
1309                                                         memmove(fraged_pages->pagedesc + to_item,
1310                                                                         fraged_pages->pagedesc + to_item + 1,
1311                                                                         sizeof(VacPage) * (num_fraged_pages - to_item - 1));
1312                                                         num_fraged_pages--;
1313                                                 }
1314                                                 for (i = 0; i < num_fraged_pages; i++)
1315                                                 {
1316                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1317                                                                 break;
1318                                                 }
1319
1320                                                 /* can't move item anywhere */
1321                                                 if (i == num_fraged_pages)
1322                                                 {
1323                                                         for (i = 0; i < num_vtmove; i++)
1324                                                         {
1325                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1326                                                                 (vtmove[i].vacpage->offsets_used)--;
1327                                                         }
1328                                                         num_vtmove = 0;
1329                                                         break;
1330                                                 }
1331                                                 to_item = i;
1332                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1333                                         }
1334                                         to_vacpage->free -= MAXALIGN(tlen);
1335                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1336                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1337                                         (to_vacpage->offsets_used)++;
1338                                         if (free_vtmove == 0)
1339                                         {
1340                                                 free_vtmove = 1000;
1341                                                 vtmove = (VTupleMove) repalloc(vtmove,
1342                                                                                          (free_vtmove + num_vtmove) *
1343                                                                                                  sizeof(VTupleMoveData));
1344                                         }
1345                                         vtmove[num_vtmove].tid = tp.t_self;
1346                                         vtmove[num_vtmove].vacpage = to_vacpage;
1347                                         if (to_vacpage->offsets_used == 1)
1348                                                 vtmove[num_vtmove].cleanVpd = true;
1349                                         else
1350                                                 vtmove[num_vtmove].cleanVpd = false;
1351                                         free_vtmove--;
1352                                         num_vtmove++;
1353
1354                                         /* All done ? */
1355                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1356                                                 tp.t_data->t_xmin < XmaxRecent)
1357                                                 break;
1358
1359                                         /* Well, try to find tuple with old row version */
1360                                         for (;;)
1361                                         {
1362                                                 Buffer          Pbuf;
1363                                                 Page            Ppage;
1364                                                 ItemId          Pitemid;
1365                                                 HeapTupleData Ptp;
1366                                                 VTupleLinkData vtld,
1367                                                                    *vtlp;
1368
1369                                                 vtld.new_tid = tp.t_self;
1370                                                 vtlp = (VTupleLink)
1371                                                         vac_bsearch((void *) &vtld,
1372                                                                                 (void *) (vacrelstats->vtlinks),
1373                                                                                 vacrelstats->num_vtlinks,
1374                                                                                 sizeof(VTupleLinkData),
1375                                                                                 vac_cmp_vtlinks);
1376                                                 if (vtlp == NULL)
1377                                                         elog(ERROR, "Parent tuple was not found");
1378                                                 tp.t_self = vtlp->this_tid;
1379                                                 Pbuf = ReadBuffer(onerel,
1380                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1381                                                 Ppage = BufferGetPage(Pbuf);
1382                                                 Pitemid = PageGetItemId(Ppage,
1383                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1384                                                 if (!ItemIdIsUsed(Pitemid))
1385                                                         elog(ERROR, "Parent itemid marked as unused");
1386                                                 Ptp.t_datamcxt = NULL;
1387                                                 Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1388                                                 Assert(ItemPointerEquals(&(vtld.new_tid),
1389                                                                                                  &(Ptp.t_data->t_ctid)));
1390
1391                                                 /*
1392                                                  * Read above about cases when
1393                                                  * !ItemIdIsUsed(Citemid) (child item is
1394                                                  * removed)... Due to the fact that at the moment
1395                                                  * we don't remove unuseful part of update-chain,
1396                                                  * it's possible to get too old parent row here.
1397                                                  * Like as in the case which caused this problem,
1398                                                  * we stop shrinking here. I could try to find
1399                                                  * real parent row but want not to do it because
1400                                                  * of real solution will be implemented anyway,
1401                                                  * latter, and we are too close to 6.5 release. -
1402                                                  * vadim 06/11/99
1403                                                  */
1404                                                 if (Ptp.t_data->t_xmax != tp.t_data->t_xmin)
1405                                                 {
1406                                                         if (freeCbuf)
1407                                                                 ReleaseBuffer(Cbuf);
1408                                                         freeCbuf = false;
1409                                                         ReleaseBuffer(Pbuf);
1410                                                         for (i = 0; i < num_vtmove; i++)
1411                                                         {
1412                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1413                                                                 (vtmove[i].vacpage->offsets_used)--;
1414                                                         }
1415                                                         num_vtmove = 0;
1416                                                         elog(NOTICE, "Too old parent tuple found - can't continue repair_frag");
1417                                                         break;
1418                                                 }
1419 #ifdef NOT_USED                                 /* I'm not sure that this will wotk
1420                                                                  * properly... */
1421
1422                                                 /*
1423                                                  * If this tuple is updated version of row and it
1424                                                  * was created by the same transaction then no one
1425                                                  * is interested in this tuple - mark it as
1426                                                  * removed.
1427                                                  */
1428                                                 if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
1429                                                         Ptp.t_data->t_xmin == Ptp.t_data->t_xmax)
1430                                                 {
1431                                                         TransactionIdStore(myXID,
1432                                                                 (TransactionId *) &(Ptp.t_data->t_cmin));
1433                                                         Ptp.t_data->t_infomask &=
1434                                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1435                                                         Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
1436                                                         WriteBuffer(Pbuf);
1437                                                         continue;
1438                                                 }
1439 #endif
1440                                                 tp.t_datamcxt = Ptp.t_datamcxt;
1441                                                 tp.t_data = Ptp.t_data;
1442                                                 tlen = tp.t_len = ItemIdGetLength(Pitemid);
1443                                                 if (freeCbuf)
1444                                                         ReleaseBuffer(Cbuf);
1445                                                 Cbuf = Pbuf;
1446                                                 freeCbuf = true;
1447                                                 break;
1448                                         }
1449                                         if (num_vtmove == 0)
1450                                                 break;
1451                                 }
1452                                 if (freeCbuf)
1453                                         ReleaseBuffer(Cbuf);
1454                                 if (num_vtmove == 0)    /* chain can't be moved */
1455                                 {
1456                                         pfree(vtmove);
1457                                         break;
1458                                 }
1459                                 ItemPointerSetInvalid(&Ctid);
1460                                 for (ti = 0; ti < num_vtmove; ti++)
1461                                 {
1462                                         VacPage         destvacpage = vtmove[ti].vacpage;
1463
1464                                         /* Get page to move from */
1465                                         tuple.t_self = vtmove[ti].tid;
1466                                         Cbuf = ReadBuffer(onerel,
1467                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1468
1469                                         /* Get page to move to */
1470                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1471
1472                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1473                                         if (cur_buffer != Cbuf)
1474                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1475
1476                                         ToPage = BufferGetPage(cur_buffer);
1477                                         Cpage = BufferGetPage(Cbuf);
1478
1479                                         Citemid = PageGetItemId(Cpage,
1480                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1481                                         tuple.t_datamcxt = NULL;
1482                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1483                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1484
1485                                         /*
1486                                          * make a copy of the source tuple, and then mark the
1487                                          * source tuple MOVED_OFF.
1488                                          */
1489                                         heap_copytuple_with_tuple(&tuple, &newtup);
1490
1491                                         RelationInvalidateHeapTuple(onerel, &tuple);
1492
1493                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1494                                         START_CRIT_SECTION();
1495
1496                                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1497                                         tuple.t_data->t_infomask &=
1498                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1499                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1500
1501                                         /*
1502                                          * If this page was not used before - clean it.
1503                                          *
1504                                          * NOTE: a nasty bug used to lurk here.  It is possible
1505                                          * for the source and destination pages to be the same
1506                                          * (since this tuple-chain member can be on a page
1507                                          * lower than the one we're currently processing in
1508                                          * the outer loop).  If that's true, then after
1509                                          * vacuum_page() the source tuple will have been
1510                                          * moved, and tuple.t_data will be pointing at
1511                                          * garbage.  Therefore we must do everything that uses
1512                                          * tuple.t_data BEFORE this step!!
1513                                          *
1514                                          * This path is different from the other callers of
1515                                          * vacuum_page, because we have already incremented
1516                                          * the vacpage's offsets_used field to account for the
1517                                          * tuple(s) we expect to move onto the page. Therefore
1518                                          * vacuum_page's check for offsets_used == 0 is wrong.
1519                                          * But since that's a good debugging check for all
1520                                          * other callers, we work around it here rather than
1521                                          * remove it.
1522                                          */
1523                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1524                                         {
1525                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1526
1527                                                 destvacpage->offsets_used = 0;
1528                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1529                                                 destvacpage->offsets_used = sv_offsets_used;
1530                                         }
1531
1532                                         /*
1533                                          * Update the state of the copied tuple, and store it
1534                                          * on the destination page.
1535                                          */
1536                                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1537                                         newtup.t_data->t_infomask &=
1538                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1539                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1540                                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1541                                                                                  InvalidOffsetNumber, LP_USED);
1542                                         if (newoff == InvalidOffsetNumber)
1543                                         {
1544                                                 elog(STOP, "moving chain: failed to add item with len = %lu to page %u",
1545                                                   (unsigned long) tuple_len, destvacpage->blkno);
1546                                         }
1547                                         newitemid = PageGetItemId(ToPage, newoff);
1548                                         pfree(newtup.t_data);
1549                                         newtup.t_datamcxt = NULL;
1550                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1551                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1552
1553                                         {
1554                                                 XLogRecPtr      recptr =
1555                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1556                                                                           cur_buffer, &newtup);
1557
1558                                                 if (Cbuf != cur_buffer)
1559                                                 {
1560                                                         PageSetLSN(Cpage, recptr);
1561                                                         PageSetSUI(Cpage, ThisStartUpID);
1562                                                 }
1563                                                 PageSetLSN(ToPage, recptr);
1564                                                 PageSetSUI(ToPage, ThisStartUpID);
1565                                         }
1566                                         END_CRIT_SECTION();
1567
1568                                         if (((int) destvacpage->blkno) > last_move_dest_block)
1569                                                 last_move_dest_block = destvacpage->blkno;
1570
1571                                         /*
1572                                          * Set new tuple's t_ctid pointing to itself for last
1573                                          * tuple in chain, and to next tuple in chain
1574                                          * otherwise.
1575                                          */
1576                                         if (!ItemPointerIsValid(&Ctid))
1577                                                 newtup.t_data->t_ctid = newtup.t_self;
1578                                         else
1579                                                 newtup.t_data->t_ctid = Ctid;
1580                                         Ctid = newtup.t_self;
1581
1582                                         num_moved++;
1583
1584                                         /*
1585                                          * Remember that we moved tuple from the current page
1586                                          * (corresponding index tuple will be cleaned).
1587                                          */
1588                                         if (Cbuf == buf)
1589                                                 vacpage->offsets[vacpage->offsets_free++] =
1590                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1591                                         else
1592                                                 keep_tuples++;
1593
1594                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1595                                         if (cur_buffer != Cbuf)
1596                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
1597
1598                                         if (Irel != (Relation *) NULL)
1599                                         {
1600
1601                                                 /*
1602                                                  * XXX using CurrentMemoryContext here means
1603                                                  * intra-vacuum memory leak for functional
1604                                                  * indexes. Should fix someday.
1605                                                  *
1606                                                  * XXX This code fails to handle partial indexes!
1607                                                  * Probably should change it to use
1608                                                  * ExecOpenIndices.
1609                                                  */
1610                                                 for (i = 0; i < nindices; i++)
1611                                                 {
1612                                                         FormIndexDatum(indexInfo[i],
1613                                                                                    &newtup,
1614                                                                                    tupdesc,
1615                                                                                    CurrentMemoryContext,
1616                                                                                    idatum,
1617                                                                                    inulls);
1618                                                         iresult = index_insert(Irel[i],
1619                                                                                                    idatum,
1620                                                                                                    inulls,
1621                                                                                                    &newtup.t_self,
1622                                                                                                    onerel);
1623                                                         if (iresult)
1624                                                                 pfree(iresult);
1625                                                 }
1626                                         }
1627                                         WriteBuffer(cur_buffer);
1628                                         WriteBuffer(Cbuf);
1629                                 }
1630                                 cur_buffer = InvalidBuffer;
1631                                 pfree(vtmove);
1632                                 chain_tuple_moved = true;
1633                                 continue;
1634                         }
1635
1636                         /* try to find new page for this tuple */
1637                         if (cur_buffer == InvalidBuffer ||
1638                                 !enough_space(cur_page, tuple_len))
1639                         {
1640                                 if (cur_buffer != InvalidBuffer)
1641                                 {
1642                                         WriteBuffer(cur_buffer);
1643                                         cur_buffer = InvalidBuffer;
1644
1645                                         /*
1646                                          * If previous target page is now too full to add *any*
1647                                          * tuple to it, remove it from fraged_pages.
1648                                          */
1649                                         if (!enough_space(cur_page, vacrelstats->min_tlen))
1650                                         {
1651                                                 Assert(num_fraged_pages > cur_item);
1652                                                 memmove(fraged_pages->pagedesc + cur_item,
1653                                                                 fraged_pages->pagedesc + cur_item + 1,
1654                                                                 sizeof(VacPage) * (num_fraged_pages - cur_item - 1));
1655                                                 num_fraged_pages--;
1656                                         }
1657                                 }
1658                                 for (i = 0; i < num_fraged_pages; i++)
1659                                 {
1660                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
1661                                                 break;
1662                                 }
1663                                 if (i == num_fraged_pages)
1664                                         break;          /* can't move item anywhere */
1665                                 cur_item = i;
1666                                 cur_page = fraged_pages->pagedesc[cur_item];
1667                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
1668                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1669                                 ToPage = BufferGetPage(cur_buffer);
1670                                 /* if this page was not used before - clean it */
1671                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
1672                                         vacuum_page(onerel, cur_buffer, cur_page);
1673                         }
1674                         else
1675                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1676
1677                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1678
1679                         /* copy tuple */
1680                         heap_copytuple_with_tuple(&tuple, &newtup);
1681
1682                         RelationInvalidateHeapTuple(onerel, &tuple);
1683
1684                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1685                         START_CRIT_SECTION();
1686
1687                         /*
1688                          * Mark new tuple as moved_in by vacuum and store vacuum XID
1689                          * in t_cmin !!!
1690                          */
1691                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1692                         newtup.t_data->t_infomask &=
1693                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1694                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1695
1696                         /* add tuple to the page */
1697                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1698                                                                  InvalidOffsetNumber, LP_USED);
1699                         if (newoff == InvalidOffsetNumber)
1700                         {
1701                                 elog(STOP, "\
1702 failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
1703                                          (unsigned long) tuple_len, cur_page->blkno, (unsigned long) cur_page->free,
1704                                          cur_page->offsets_used, cur_page->offsets_free);
1705                         }
1706                         newitemid = PageGetItemId(ToPage, newoff);
1707                         pfree(newtup.t_data);
1708                         newtup.t_datamcxt = NULL;
1709                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1710                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
1711                         newtup.t_self = newtup.t_data->t_ctid;
1712
1713                         /*
1714                          * Mark old tuple as moved_off by vacuum and store vacuum XID
1715                          * in t_cmin !!!
1716                          */
1717                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1718                         tuple.t_data->t_infomask &=
1719                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1720                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1721
1722                         {
1723                                 XLogRecPtr      recptr =
1724                                 log_heap_move(onerel, buf, tuple.t_self,
1725                                                           cur_buffer, &newtup);
1726
1727                                 PageSetLSN(page, recptr);
1728                                 PageSetSUI(page, ThisStartUpID);
1729                                 PageSetLSN(ToPage, recptr);
1730                                 PageSetSUI(ToPage, ThisStartUpID);
1731                         }
1732                         END_CRIT_SECTION();
1733
1734                         cur_page->offsets_used++;
1735                         num_moved++;
1736                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1737                         if (((int) cur_page->blkno) > last_move_dest_block)
1738                                 last_move_dest_block = cur_page->blkno;
1739
1740                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1741
1742                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1743                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1744
1745                         /* insert index' tuples if needed */
1746                         if (Irel != (Relation *) NULL)
1747                         {
1748
1749                                 /*
1750                                  * XXX using CurrentMemoryContext here means intra-vacuum
1751                                  * memory leak for functional indexes. Should fix someday.
1752                                  *
1753                                  * XXX This code fails to handle partial indexes! Probably
1754                                  * should change it to use ExecOpenIndices.
1755                                  */
1756                                 for (i = 0; i < nindices; i++)
1757                                 {
1758                                         FormIndexDatum(indexInfo[i],
1759                                                                    &newtup,
1760                                                                    tupdesc,
1761                                                                    CurrentMemoryContext,
1762                                                                    idatum,
1763                                                                    inulls);
1764                                         iresult = index_insert(Irel[i],
1765                                                                                    idatum,
1766                                                                                    inulls,
1767                                                                                    &newtup.t_self,
1768                                                                                    onerel);
1769                                         if (iresult)
1770                                                 pfree(iresult);
1771                                 }
1772                         }
1773
1774                 }                                               /* walk along page */
1775
1776                 if (offnum < maxoff && keep_tuples > 0)
1777                 {
1778                         OffsetNumber off;
1779
1780                         for (off = OffsetNumberNext(offnum);
1781                                  off <= maxoff;
1782                                  off = OffsetNumberNext(off))
1783                         {
1784                                 itemid = PageGetItemId(page, off);
1785                                 if (!ItemIdIsUsed(itemid))
1786                                         continue;
1787                                 tuple.t_datamcxt = NULL;
1788                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1789                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
1790                                         continue;
1791                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1792                                         elog(ERROR, "Invalid XID in t_cmin (4)");
1793                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1794                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
1795                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1796                                 {
1797                                         /* some chains was moved while */
1798                                         if (chain_tuple_moved)
1799                                         {                       /* cleaning this page */
1800                                                 Assert(vacpage->offsets_free > 0);
1801                                                 for (i = 0; i < vacpage->offsets_free; i++)
1802                                                 {
1803                                                         if (vacpage->offsets[i] == off)
1804                                                                 break;
1805                                                 }
1806                                                 if (i >= vacpage->offsets_free) /* not found */
1807                                                 {
1808                                                         vacpage->offsets[vacpage->offsets_free++] = off;
1809                                                         Assert(keep_tuples > 0);
1810                                                         keep_tuples--;
1811                                                 }
1812                                         }
1813                                         else
1814                                         {
1815                                                 vacpage->offsets[vacpage->offsets_free++] = off;
1816                                                 Assert(keep_tuples > 0);
1817                                                 keep_tuples--;
1818                                         }
1819                                 }
1820                         }
1821                 }
1822
1823                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
1824                 {
1825                         if (chain_tuple_moved)          /* else - they are ordered */
1826                         {
1827                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
1828                                           sizeof(OffsetNumber), vac_cmp_offno);
1829                         }
1830                         reap_page(&Nvacpagelist, vacpage);
1831                         WriteBuffer(buf);
1832                 }
1833                 else if (dowrite)
1834                         WriteBuffer(buf);
1835                 else
1836                         ReleaseBuffer(buf);
1837
1838                 if (offnum <= maxoff)
1839                         break;                          /* some item(s) left */
1840
1841         }                                                       /* walk along relation */
1842
1843         blkno++;                                        /* new number of blocks */
1844
1845         if (cur_buffer != InvalidBuffer)
1846         {
1847                 Assert(num_moved > 0);
1848                 WriteBuffer(cur_buffer);
1849         }
1850
1851         if (num_moved > 0)
1852         {
1853
1854                 /*
1855                  * We have to commit our tuple movings before we truncate the
1856                  * relation.  Ideally we should do Commit/StartTransactionCommand
1857                  * here, relying on the session-level table lock to protect our
1858                  * exclusive access to the relation.  However, that would require
1859                  * a lot of extra code to close and re-open the relation, indices,
1860                  * etc.  For now, a quick hack: record status of current
1861                  * transaction as committed, and continue.
1862                  */
1863                 RecordTransactionCommit();
1864         }
1865
1866         /*
1867          * Clean uncleaned reaped pages from vacuum_pages list list and set
1868          * xmin committed for inserted tuples
1869          */
1870         checked_moved = 0;
1871         for (i = 0, curpage = vacuum_pages->pagedesc; i < vacuumed_pages; i++, curpage++)
1872         {
1873                 Assert((*curpage)->blkno < (BlockNumber) blkno);
1874                 buf = ReadBuffer(onerel, (*curpage)->blkno);
1875                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1876                 page = BufferGetPage(buf);
1877                 if ((*curpage)->offsets_used == 0)              /* this page was not used */
1878                 {
1879                         if (!PageIsEmpty(page))
1880                                 vacuum_page(onerel, buf, *curpage);
1881                 }
1882                 else
1883 /* this page was used */
1884                 {
1885                         num_tuples = 0;
1886                         max_offset = PageGetMaxOffsetNumber(page);
1887                         for (newoff = FirstOffsetNumber;
1888                                  newoff <= max_offset;
1889                                  newoff = OffsetNumberNext(newoff))
1890                         {
1891                                 itemid = PageGetItemId(page, newoff);
1892                                 if (!ItemIdIsUsed(itemid))
1893                                         continue;
1894                                 tuple.t_datamcxt = NULL;
1895                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1896                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1897                                 {
1898                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
1899                                                 elog(ERROR, "Invalid XID in t_cmin (2)");
1900                                         if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1901                                         {
1902                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
1903                                                 num_tuples++;
1904                                         }
1905                                         else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1906                                                 tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
1907                                         else
1908                                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
1909                                 }
1910                         }
1911                         Assert((*curpage)->offsets_used == num_tuples);
1912                         checked_moved += num_tuples;
1913                 }
1914                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1915                 WriteBuffer(buf);
1916         }
1917         Assert(num_moved == checked_moved);
1918
1919         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. %s",
1920                  RelationGetRelationName(onerel),
1921                  nblocks, blkno, num_moved,
1922                  show_rusage(&ru0));
1923
1924         /*
1925          * Reflect the motion of system tuples to catalog cache here.
1926          */
1927         CommandCounterIncrement();
1928
1929         if (Nvacpagelist.num_pages > 0)
1930         {
1931                 /* vacuum indices again if needed */
1932                 if (Irel != (Relation *) NULL)
1933                 {
1934                         VacPage    *vpleft,
1935                                            *vpright,
1936                                                 vpsave;
1937
1938                         /* re-sort Nvacpagelist.pagedesc */
1939                         for (vpleft = Nvacpagelist.pagedesc,
1940                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
1941                                  vpleft < vpright; vpleft++, vpright--)
1942                         {
1943                                 vpsave = *vpleft;
1944                                 *vpleft = *vpright;
1945                                 *vpright = vpsave;
1946                         }
1947                         Assert(keep_tuples >= 0);
1948                         for (i = 0; i < nindices; i++)
1949                                 vacuum_index(&Nvacpagelist, Irel[i],
1950                                                          vacrelstats->num_tuples, keep_tuples);
1951                 }
1952
1953                 /* clean moved tuples from last page in Nvacpagelist list */
1954                 if (vacpage->blkno == (BlockNumber) (blkno - 1) &&
1955                         vacpage->offsets_free > 0)
1956                 {
1957                         OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
1958                         OffsetNumber *unused = unbuf;
1959                         int                     uncnt;
1960
1961                         buf = ReadBuffer(onerel, vacpage->blkno);
1962                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1963                         page = BufferGetPage(buf);
1964                         num_tuples = 0;
1965                         maxoff = PageGetMaxOffsetNumber(page);
1966                         for (offnum = FirstOffsetNumber;
1967                                  offnum <= maxoff;
1968                                  offnum = OffsetNumberNext(offnum))
1969                         {
1970                                 itemid = PageGetItemId(page, offnum);
1971                                 if (!ItemIdIsUsed(itemid))
1972                                         continue;
1973                                 tuple.t_datamcxt = NULL;
1974                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1975
1976                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1977                                 {
1978                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
1979                                                 elog(ERROR, "Invalid XID in t_cmin (3)");
1980                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1981                                         {
1982                                                 itemid->lp_flags &= ~LP_USED;
1983                                                 num_tuples++;
1984                                         }
1985                                         else
1986                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
1987                                 }
1988
1989                         }
1990                         Assert(vacpage->offsets_free == num_tuples);
1991                         START_CRIT_SECTION();
1992                         uncnt = PageRepairFragmentation(page, unused);
1993                         {
1994                                 XLogRecPtr      recptr;
1995
1996                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
1997                                                   (char *) (&(unused[uncnt])) - (char *) unused);
1998                                 PageSetLSN(page, recptr);
1999                                 PageSetSUI(page, ThisStartUpID);
2000                         }
2001                         END_CRIT_SECTION();
2002                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2003                         WriteBuffer(buf);
2004                 }
2005
2006                 /* now - free new list of reaped pages */
2007                 curpage = Nvacpagelist.pagedesc;
2008                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2009                         pfree(*curpage);
2010                 pfree(Nvacpagelist.pagedesc);
2011         }
2012
2013         /*
2014          * Flush dirty pages out to disk.  We do this unconditionally, even if
2015          * we don't need to truncate, because we want to ensure that all
2016          * tuples have correct on-row commit status on disk (see bufmgr.c's
2017          * comments for FlushRelationBuffers()).
2018          */
2019         i = FlushRelationBuffers(onerel, blkno);
2020         if (i < 0)
2021                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2022                          i);
2023
2024         /* truncate relation, if needed */
2025         if (blkno < nblocks)
2026         {
2027                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2028                 Assert(blkno >= 0);
2029                 vacrelstats->num_pages = blkno; /* set new number of blocks */
2030         }
2031
2032         if (Irel != (Relation *) NULL)          /* pfree index' allocations */
2033         {
2034                 close_indices(nindices, Irel);
2035                 pfree(indexInfo);
2036         }
2037
2038         pfree(vacpage);
2039         if (vacrelstats->vtlinks != NULL)
2040                 pfree(vacrelstats->vtlinks);
2041 }
2042
2043 /*
2044  *      vacuum_heap() -- free dead tuples
2045  *
2046  *              This routine marks dead tuples as unused and truncates relation
2047  *              if there are "empty" end-blocks.
2048  */
2049 static void
2050 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2051 {
2052         Buffer          buf;
2053         VacPage    *vacpage;
2054         long            nblocks;
2055         int                     i;
2056
2057         nblocks = vacuum_pages->num_pages;
2058         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2059
2060         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2061         {
2062                 if ((*vacpage)->offsets_free > 0)
2063                 {
2064                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2065                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2066                         vacuum_page(onerel, buf, *vacpage);
2067                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2068                         WriteBuffer(buf);
2069                 }
2070         }
2071
2072         /*
2073          * Flush dirty pages out to disk.  We do this unconditionally, even if
2074          * we don't need to truncate, because we want to ensure that all
2075          * tuples have correct on-row commit status on disk (see bufmgr.c's
2076          * comments for FlushRelationBuffers()).
2077          */
2078         Assert(vacrelstats->num_pages >= vacuum_pages->empty_end_pages);
2079         nblocks = vacrelstats->num_pages - vacuum_pages->empty_end_pages;
2080
2081         i = FlushRelationBuffers(onerel, nblocks);
2082         if (i < 0)
2083                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2084                          i);
2085
2086         /* truncate relation if there are some empty end-pages */
2087         if (vacuum_pages->empty_end_pages > 0)
2088         {
2089                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %lu --> %lu.",
2090                          RelationGetRelationName(onerel),
2091                          vacrelstats->num_pages, nblocks);
2092                 nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks);
2093                 Assert(nblocks >= 0);
2094                 vacrelstats->num_pages = nblocks;               /* set new number of
2095                                                                                                  * blocks */
2096         }
2097 }
2098
2099 /*
2100  *      vacuum_page() -- free dead tuples on a page
2101  *                                       and repair its fragmentation.
2102  */
2103 static void
2104 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2105 {
2106         OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
2107         OffsetNumber *unused = unbuf;
2108         int                     uncnt;
2109         Page            page = BufferGetPage(buffer);
2110         ItemId          itemid;
2111         int                     i;
2112
2113         /* There shouldn't be any tuples moved onto the page yet! */
2114         Assert(vacpage->offsets_used == 0);
2115
2116         START_CRIT_SECTION();
2117         for (i = 0; i < vacpage->offsets_free; i++)
2118         {
2119                 itemid = &(((PageHeader) page)->pd_linp[vacpage->offsets[i] - 1]);
2120                 itemid->lp_flags &= ~LP_USED;
2121         }
2122         uncnt = PageRepairFragmentation(page, unused);
2123         {
2124                 XLogRecPtr      recptr;
2125
2126                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2127                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2128                 PageSetLSN(page, recptr);
2129                 PageSetSUI(page, ThisStartUpID);
2130         }
2131         END_CRIT_SECTION();
2132 }
2133
2134 /*
2135  *      _scan_index() -- scan one index relation to update statistic.
2136  *
2137  */
2138 static void
2139 scan_index(Relation indrel, long num_tuples)
2140 {
2141         RetrieveIndexResult res;
2142         IndexScanDesc iscan;
2143         long            nitups;
2144         int                     nipages;
2145         VacRUsage       ru0;
2146
2147         init_rusage(&ru0);
2148
2149         /* walk through the entire index */
2150         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
2151         nitups = 0;
2152
2153         while ((res = index_getnext(iscan, ForwardScanDirection))
2154                    != (RetrieveIndexResult) NULL)
2155         {
2156                 nitups++;
2157                 pfree(res);
2158         }
2159
2160         index_endscan(iscan);
2161
2162         /* now update statistics in pg_class */
2163         nipages = RelationGetNumberOfBlocks(indrel);
2164         vac_update_relstats(RelationGetRelid(indrel), nipages, nitups, false);
2165
2166         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %lu. %s",
2167                  RelationGetRelationName(indrel), nipages, nitups,
2168                  show_rusage(&ru0));
2169
2170         if (nitups != num_tuples)
2171                 elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%lu) IS NOT THE SAME AS HEAP' (%lu).\
2172 \n\tRecreate the index.",
2173                          RelationGetRelationName(indrel), nitups, num_tuples);
2174
2175 }
2176
2177 /*
2178  *      vacuum_index() -- vacuum one index relation.
2179  *
2180  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2181  *              It's locked. Indrel is an index relation on the vacuumed heap.
2182  *              We don't set locks on the index relation here, since the indexed
2183  *              access methods support locking at different granularities.
2184  *              We let them handle it.
2185  *
2186  *              Finally, we arrange to update the index relation's statistics in
2187  *              pg_class.
2188  */
2189 static void
2190 vacuum_index(VacPageList vacpagelist, Relation indrel,
2191                          long num_tuples, int keep_tuples)
2192 {
2193         RetrieveIndexResult res;
2194         IndexScanDesc iscan;
2195         ItemPointer heapptr;
2196         int                     tups_vacuumed;
2197         long            num_index_tuples;
2198         int                     num_pages;
2199         VacPage         vp;
2200         VacRUsage       ru0;
2201
2202         init_rusage(&ru0);
2203
2204         /* walk through the entire index */
2205         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
2206         tups_vacuumed = 0;
2207         num_index_tuples = 0;
2208
2209         while ((res = index_getnext(iscan, ForwardScanDirection))
2210                    != (RetrieveIndexResult) NULL)
2211         {
2212                 heapptr = &res->heap_iptr;
2213
2214                 if ((vp = tid_reaped(heapptr, vacpagelist)) != (VacPage) NULL)
2215                 {
2216 #ifdef NOT_USED
2217                         elog(DEBUG, "<%x,%x> -> <%x,%x>",
2218                                  ItemPointerGetBlockNumber(&(res->index_iptr)),
2219                                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
2220                                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
2221                                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
2222 #endif
2223                         if (vp->offsets_free == 0)
2224                         {
2225                                 elog(NOTICE, "Index %s: pointer to EmptyPage (blk %u off %u) - fixing",
2226                                          RelationGetRelationName(indrel),
2227                                          vp->blkno, ItemPointerGetOffsetNumber(heapptr));
2228                         }
2229                         ++tups_vacuumed;
2230                         index_delete(indrel, &res->index_iptr);
2231                 }
2232                 else
2233                         num_index_tuples++;
2234
2235                 pfree(res);
2236         }
2237
2238         index_endscan(iscan);
2239
2240         /* now update statistics in pg_class */
2241         num_pages = RelationGetNumberOfBlocks(indrel);
2242         vac_update_relstats(RelationGetRelid(indrel),
2243                                                 num_pages, num_index_tuples, false);
2244
2245         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %lu: Deleted %u. %s",
2246                  RelationGetRelationName(indrel), num_pages,
2247                  num_index_tuples - keep_tuples, tups_vacuumed,
2248                  show_rusage(&ru0));
2249
2250         if (num_index_tuples != num_tuples + keep_tuples)
2251                 elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%lu) IS NOT THE SAME AS HEAP' (%lu).\
2252 \n\tRecreate the index.",
2253                   RelationGetRelationName(indrel), num_index_tuples, num_tuples);
2254
2255 }
2256
2257 /*
2258  *      tid_reaped() -- is a particular tid reaped?
2259  *
2260  *              vacpagelist->VacPage_array is sorted in right order.
2261  */
2262 static VacPage
2263 tid_reaped(ItemPointer itemptr, VacPageList vacpagelist)
2264 {
2265         OffsetNumber ioffno;
2266         OffsetNumber *voff;
2267         VacPage         vp,
2268                            *vpp;
2269         VacPageData vacpage;
2270
2271         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2272         ioffno = ItemPointerGetOffsetNumber(itemptr);
2273
2274         vp = &vacpage;
2275         vpp = (VacPage *) vac_bsearch((void *) &vp,
2276                                                                   (void *) (vacpagelist->pagedesc),
2277                                                                   vacpagelist->num_pages,
2278                                                                   sizeof(VacPage),
2279                                                                   vac_cmp_blk);
2280
2281         if (vpp == (VacPage *) NULL)
2282                 return (VacPage) NULL;
2283
2284         /* ok - we are on a partially or fully reaped page */
2285         vp = *vpp;
2286
2287         if (vp->offsets_free == 0)
2288         {
2289                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2290                 return vp;
2291         }
2292
2293         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2294                                                                                 (void *) (vp->offsets),
2295                                                                                 vp->offsets_free,
2296                                                                                 sizeof(OffsetNumber),
2297                                                                                 vac_cmp_offno);
2298
2299         if (voff == (OffsetNumber *) NULL)
2300                 return (VacPage) NULL;
2301
2302         /* tid is reaped */
2303         return vp;
2304 }
2305
2306 /*
2307  *      vac_update_relstats() -- update statistics for one relation
2308  *
2309  *              Update the whole-relation statistics that are kept in its pg_class
2310  *              row.  There are additional stats that will be updated if we are
2311  *              doing VACUUM ANALYZE, but we always update these stats.
2312  *
2313  *              This routine works for both index and heap relation entries in
2314  *              pg_class.  We violate no-overwrite semantics here by storing new
2315  *              values for the statistics columns directly into the pg_class
2316  *              tuple that's already on the page.  The reason for this is that if
2317  *              we updated these tuples in the usual way, vacuuming pg_class itself
2318  *              wouldn't work very well --- by the time we got done with a vacuum
2319  *              cycle, most of the tuples in pg_class would've been obsoleted.
2320  *              Of course, this only works for fixed-size never-null columns, but
2321  *              these are.
2322  */
2323 void
2324 vac_update_relstats(Oid relid, long num_pages, double num_tuples,
2325                                         bool hasindex)
2326 {
2327         Relation        rd;
2328         HeapTupleData rtup;
2329         HeapTuple       ctup;
2330         Form_pg_class pgcform;
2331         Buffer          buffer;
2332
2333         /*
2334          * update number of tuples and number of pages in pg_class
2335          */
2336         rd = heap_openr(RelationRelationName, RowExclusiveLock);
2337
2338         ctup = SearchSysCache(RELOID,
2339                                                   ObjectIdGetDatum(relid),
2340                                                   0, 0, 0);
2341         if (!HeapTupleIsValid(ctup))
2342                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
2343                          relid);
2344
2345         /* get the buffer cache tuple */
2346         rtup.t_self = ctup->t_self;
2347         ReleaseSysCache(ctup);
2348         heap_fetch(rd, SnapshotNow, &rtup, &buffer);
2349
2350         /* overwrite the existing statistics in the tuple */
2351         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
2352         pgcform->reltuples = num_tuples;
2353         pgcform->relpages = num_pages;
2354         pgcform->relhasindex = hasindex;
2355
2356         /* invalidate the tuple in the cache and write the buffer */
2357         RelationInvalidateHeapTuple(rd, &rtup);
2358         WriteBuffer(buffer);
2359
2360         heap_close(rd, RowExclusiveLock);
2361 }
2362
2363 /*
2364  *      reap_page() -- save a page on the array of reaped pages.
2365  *
2366  *              As a side effect of the way that the vacuuming loop for a given
2367  *              relation works, higher pages come after lower pages in the array
2368  *              (and highest tid on a page is last).
2369  */
2370 static void
2371 reap_page(VacPageList vacpagelist, VacPage vacpage)
2372 {
2373         VacPage         newvacpage;
2374
2375         /* allocate a VacPageData entry */
2376         newvacpage = (VacPage) palloc(sizeof(VacPageData) + vacpage->offsets_free * sizeof(OffsetNumber));
2377
2378         /* fill it in */
2379         if (vacpage->offsets_free > 0)
2380                 memmove(newvacpage->offsets, vacpage->offsets, vacpage->offsets_free * sizeof(OffsetNumber));
2381         newvacpage->blkno = vacpage->blkno;
2382         newvacpage->free = vacpage->free;
2383         newvacpage->offsets_used = vacpage->offsets_used;
2384         newvacpage->offsets_free = vacpage->offsets_free;
2385
2386         /* insert this page into vacpagelist list */
2387         vpage_insert(vacpagelist, newvacpage);
2388
2389 }
2390
2391 static void
2392 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2393 {
2394 #define PG_NPAGEDESC 1024
2395
2396         /* allocate a VacPage entry if needed */
2397         if (vacpagelist->num_pages == 0)
2398         {
2399                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2400                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2401         }
2402         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2403         {
2404                 vacpagelist->num_allocated_pages *= 2;
2405                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2406         }
2407         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2408         (vacpagelist->num_pages)++;
2409 }
2410
2411 /*
2412  * vac_bsearch: just like standard C library routine bsearch(),
2413  * except that we first test to see whether the target key is outside
2414  * the range of the table entries.  This case is handled relatively slowly
2415  * by the normal binary search algorithm (ie, no faster than any other key)
2416  * but it occurs often enough in VACUUM to be worth optimizing.
2417  */
2418 static void *
2419 vac_bsearch(const void *key, const void *base,
2420                         size_t nelem, size_t size,
2421                         int (*compar) (const void *, const void *))
2422 {
2423         int                     res;
2424         const void *last;
2425
2426         if (nelem == 0)
2427                 return NULL;
2428         res = compar(key, base);
2429         if (res < 0)
2430                 return NULL;
2431         if (res == 0)
2432                 return (void *) base;
2433         if (nelem > 1)
2434         {
2435                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2436                 res = compar(key, last);
2437                 if (res > 0)
2438                         return NULL;
2439                 if (res == 0)
2440                         return (void *) last;
2441         }
2442         if (nelem <= 2)
2443                 return NULL;                    /* already checked 'em all */
2444         return bsearch(key, base, nelem, size, compar);
2445 }
2446
2447 /*
2448  * Comparator routines for use with qsort() and bsearch().
2449  */
2450 static int
2451 vac_cmp_blk(const void *left, const void *right)
2452 {
2453         BlockNumber lblk,
2454                                 rblk;
2455
2456         lblk = (*((VacPage *) left))->blkno;
2457         rblk = (*((VacPage *) right))->blkno;
2458
2459         if (lblk < rblk)
2460                 return -1;
2461         if (lblk == rblk)
2462                 return 0;
2463         return 1;
2464 }
2465
2466 static int
2467 vac_cmp_offno(const void *left, const void *right)
2468 {
2469         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2470                 return -1;
2471         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2472                 return 0;
2473         return 1;
2474 }
2475
2476 static int
2477 vac_cmp_vtlinks(const void *left, const void *right)
2478 {
2479         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2480                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2481                 return -1;
2482         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2483                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2484                 return 1;
2485         /* bi_hi-es are equal */
2486         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2487                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2488                 return -1;
2489         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2490                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2491                 return 1;
2492         /* bi_lo-es are equal */
2493         if (((VTupleLink) left)->new_tid.ip_posid <
2494                 ((VTupleLink) right)->new_tid.ip_posid)
2495                 return -1;
2496         if (((VTupleLink) left)->new_tid.ip_posid >
2497                 ((VTupleLink) right)->new_tid.ip_posid)
2498                 return 1;
2499         return 0;
2500 }
2501
2502
2503 static void
2504 get_indices(Relation relation, int *nindices, Relation **Irel)
2505 {
2506         List       *indexoidlist,
2507                            *indexoidscan;
2508         int                     i;
2509
2510         indexoidlist = RelationGetIndexList(relation);
2511
2512         *nindices = length(indexoidlist);
2513
2514         if (*nindices > 0)
2515                 *Irel = (Relation *) palloc(*nindices * sizeof(Relation));
2516         else
2517                 *Irel = NULL;
2518
2519         i = 0;
2520         foreach(indexoidscan, indexoidlist)
2521         {
2522                 Oid                     indexoid = lfirsti(indexoidscan);
2523
2524                 (*Irel)[i] = index_open(indexoid);
2525                 i++;
2526         }
2527
2528         freeList(indexoidlist);
2529 }
2530
2531
2532 static void
2533 close_indices(int nindices, Relation *Irel)
2534 {
2535
2536         if (Irel == (Relation *) NULL)
2537                 return;
2538
2539         while (nindices--)
2540                 index_close(Irel[nindices]);
2541         pfree(Irel);
2542
2543 }
2544
2545
2546 /*
2547  * Obtain IndexInfo data for each index on the rel
2548  */
2549 static IndexInfo **
2550 get_index_desc(Relation onerel, int nindices, Relation *Irel)
2551 {
2552         IndexInfo **indexInfo;
2553         int                     i;
2554         HeapTuple       cachetuple;
2555
2556         indexInfo = (IndexInfo **) palloc(nindices * sizeof(IndexInfo *));
2557
2558         for (i = 0; i < nindices; i++)
2559         {
2560                 cachetuple = SearchSysCache(INDEXRELID,
2561                                                          ObjectIdGetDatum(RelationGetRelid(Irel[i])),
2562                                                                         0, 0, 0);
2563                 if (!HeapTupleIsValid(cachetuple))
2564                         elog(ERROR, "get_index_desc: index %u not found",
2565                                  RelationGetRelid(Irel[i]));
2566                 indexInfo[i] = BuildIndexInfo(cachetuple);
2567                 ReleaseSysCache(cachetuple);
2568         }
2569
2570         return indexInfo;
2571 }
2572
2573
2574 static bool
2575 enough_space(VacPage vacpage, Size len)
2576 {
2577
2578         len = MAXALIGN(len);
2579
2580         if (len > vacpage->free)
2581                 return false;
2582
2583         if (vacpage->offsets_used < vacpage->offsets_free)      /* there are free
2584                                                                                                                  * itemid(s) */
2585                 return true;                    /* and len <= free_space */
2586
2587         /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2588         if (len + MAXALIGN(sizeof(ItemIdData)) <= vacpage->free)
2589                 return true;
2590
2591         return false;
2592
2593 }
2594
2595
2596 /*
2597  * Initialize usage snapshot.
2598  */
2599 static void
2600 init_rusage(VacRUsage *ru0)
2601 {
2602         struct timezone tz;
2603
2604         getrusage(RUSAGE_SELF, &ru0->ru);
2605         gettimeofday(&ru0->tv, &tz);
2606 }
2607
2608 /*
2609  * Compute elapsed time since ru0 usage snapshot, and format into
2610  * a displayable string.  Result is in a static string, which is
2611  * tacky, but no one ever claimed that the Postgres backend is
2612  * threadable...
2613  */
2614 static char *
2615 show_rusage(VacRUsage *ru0)
2616 {
2617         static char result[100];
2618         VacRUsage       ru1;
2619
2620         init_rusage(&ru1);
2621
2622         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
2623         {
2624                 ru1.tv.tv_sec--;
2625                 ru1.tv.tv_usec += 1000000;
2626         }
2627         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
2628         {
2629                 ru1.ru.ru_stime.tv_sec--;
2630                 ru1.ru.ru_stime.tv_usec += 1000000;
2631         }
2632         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
2633         {
2634                 ru1.ru.ru_utime.tv_sec--;
2635                 ru1.ru.ru_utime.tv_usec += 1000000;
2636         }
2637
2638         snprintf(result, sizeof(result),
2639                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
2640                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
2641                          (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
2642                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
2643                          (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
2644                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
2645                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
2646
2647         return result;
2648 }