]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Rewrite of planner statistics-gathering code. ANALYZE is now available as
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        the postgres vacuum cleaner
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.190 2001/05/07 00:43:18 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include <sys/types.h>
18 #include <sys/file.h>
19 #include <sys/stat.h>
20 #include <fcntl.h>
21 #include <unistd.h>
22
23 #ifndef HAVE_GETRUSAGE
24 #include "rusagestub.h"
25 #else
26 #include <sys/time.h>
27 #include <sys/resource.h>
28 #endif
29
30 #include "access/genam.h"
31 #include "access/heapam.h"
32 #include "access/xlog.h"
33 #include "catalog/catalog.h"
34 #include "catalog/catname.h"
35 #include "catalog/index.h"
36 #include "commands/vacuum.h"
37 #include "miscadmin.h"
38 #include "nodes/execnodes.h"
39 #include "storage/sinval.h"
40 #include "storage/smgr.h"
41 #include "tcop/tcopprot.h"
42 #include "utils/acl.h"
43 #include "utils/builtins.h"
44 #include "utils/fmgroids.h"
45 #include "utils/inval.h"
46 #include "utils/relcache.h"
47 #include "utils/syscache.h"
48 #include "utils/temprel.h"
49
50 extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
51                            char *unused, int unlen);
52 extern XLogRecPtr log_heap_move(Relation reln,
53                           Buffer oldbuf, ItemPointerData from,
54                           Buffer newbuf, HeapTuple newtup);
55
56
57 typedef struct VRelListData
58 {
59         Oid                     vrl_relid;
60         struct VRelListData *vrl_next;
61 } VRelListData;
62
63 typedef VRelListData *VRelList;
64
65 typedef struct VacPageData
66 {
67         BlockNumber blkno;                      /* BlockNumber of this Page */
68         Size            free;                   /* FreeSpace on this Page */
69         uint16          offsets_used;   /* Number of OffNums used by vacuum */
70         uint16          offsets_free;   /* Number of OffNums free or to be free */
71         OffsetNumber offsets[1];        /* Array of its OffNums */
72 } VacPageData;
73
74 typedef VacPageData *VacPage;
75
76 typedef struct VacPageListData
77 {
78         int                     empty_end_pages;/* Number of "empty" end-pages */
79         int                     num_pages;              /* Number of pages in pagedesc */
80         int                     num_allocated_pages;    /* Number of allocated pages in
81                                                                                  * pagedesc */
82         VacPage    *pagedesc;           /* Descriptions of pages */
83 } VacPageListData;
84
85 typedef VacPageListData *VacPageList;
86
87 typedef struct VTupleLinkData
88 {
89         ItemPointerData new_tid;
90         ItemPointerData this_tid;
91 } VTupleLinkData;
92
93 typedef VTupleLinkData *VTupleLink;
94
95 typedef struct VTupleMoveData
96 {
97         ItemPointerData tid;            /* tuple ID */
98         VacPage         vacpage;                /* where to move */
99         bool            cleanVpd;               /* clean vacpage before using */
100 } VTupleMoveData;
101
102 typedef VTupleMoveData *VTupleMove;
103
104 typedef struct VRelStats
105 {
106         Oid                     relid;
107         long            num_pages;
108         long            num_tuples;
109         Size            min_tlen;
110         Size            max_tlen;
111         bool            hasindex;
112         int                     num_vtlinks;
113         VTupleLink      vtlinks;
114 } VRelStats;
115
116
117 static MemoryContext vac_context = NULL;
118
119 static int      MESSAGE_LEVEL;          /* message level */
120
121 static TransactionId XmaxRecent;
122
123
124 /* non-export function prototypes */
125 static void vacuum_init(void);
126 static void vacuum_shutdown(void);
127 static VRelList getrels(Name VacRelP, const char *stmttype);
128 static void vacuum_rel(Oid relid);
129 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
130                                           VacPageList vacuum_pages, VacPageList fraged_pages);
131 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
132                                                 VacPageList vacuum_pages, VacPageList fraged_pages,
133                                                 int nindices, Relation *Irel);
134 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
135                                                 VacPageList vacpagelist);
136 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
137 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
138                                                  long num_tuples, int keep_tuples);
139 static void scan_index(Relation indrel, long num_tuples);
140 static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist);
141 static void reap_page(VacPageList vacpagelist, VacPage vacpage);
142 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
143 static void get_indices(Relation relation, int *nindices, Relation **Irel);
144 static void close_indices(int nindices, Relation *Irel);
145 static IndexInfo **get_index_desc(Relation onerel, int nindices,
146                            Relation *Irel);
147 static void *vac_find_eq(void *bot, int nelem, int size, void *elm,
148                         int (*compar) (const void *, const void *));
149 static int      vac_cmp_blk(const void *left, const void *right);
150 static int      vac_cmp_offno(const void *left, const void *right);
151 static int      vac_cmp_vtlinks(const void *left, const void *right);
152 static bool enough_space(VacPage vacpage, Size len);
153 static char *show_rusage(struct rusage * ru0);
154
155
156 /*
157  * Primary entry point for VACUUM and ANALYZE commands.
158  */
159 void
160 vacuum(VacuumStmt *vacstmt)
161 {
162         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
163         NameData        VacRel;
164         Name            VacRelName;
165         VRelList        vrl,
166                                 cur;
167
168         /*
169          * We cannot run VACUUM inside a user transaction block; if we were
170          * inside a transaction, then our commit- and
171          * start-transaction-command calls would not have the intended effect!
172          * Furthermore, the forced commit that occurs before truncating the
173          * relation's file would have the effect of committing the rest of the
174          * user's transaction too, which would certainly not be the desired
175          * behavior.
176          */
177         if (IsTransactionBlock())
178                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
179
180         if (vacstmt->verbose)
181                 MESSAGE_LEVEL = NOTICE;
182         else
183                 MESSAGE_LEVEL = DEBUG;
184
185         /*
186          * Create special memory context for cross-transaction storage.
187          *
188          * Since it is a child of QueryContext, it will go away eventually even
189          * if we suffer an error; there's no need for special abort cleanup
190          * logic.
191          */
192         vac_context = AllocSetContextCreate(QueryContext,
193                                                                                 "Vacuum",
194                                                                                 ALLOCSET_DEFAULT_MINSIZE,
195                                                                                 ALLOCSET_DEFAULT_INITSIZE,
196                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
197
198         /* Convert vacrel, which is just a string, to a Name */
199         if (vacstmt->vacrel)
200         {
201                 namestrcpy(&VacRel, vacstmt->vacrel);
202                 VacRelName = &VacRel;
203         }
204         else
205                 VacRelName = NULL;
206
207         /* Build list of relations to process (note this lives in vac_context) */
208         vrl = getrels(VacRelName, stmttype);
209
210         /*
211          * Start up the vacuum cleaner.
212          */
213         vacuum_init();
214
215         /*
216          * Process each selected relation.  We are careful to process
217          * each relation in a separate transaction in order to avoid holding
218          * too many locks at one time.
219          */
220         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
221         {
222                 if (vacstmt->vacuum)
223                         vacuum_rel(cur->vrl_relid);
224                 /* analyze separately so locking is minimized */
225                 if (vacstmt->analyze)
226                         analyze_rel(cur->vrl_relid, vacstmt);
227         }
228
229         /* clean up */
230         vacuum_shutdown();
231 }
232
233 /*
234  *      vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
235  *
236  *              Formerly, there was code here to prevent more than one VACUUM from
237  *              executing concurrently in the same database.  However, there's no
238  *              good reason to prevent that, and manually removing lockfiles after
239  *              a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
240  *              and just rely on the exclusive lock we grab on each target table
241  *              to ensure that there aren't two VACUUMs running on the same table
242  *              at the same time.
243  *
244  *              The strangeness with committing and starting transactions in the
245  *              init and shutdown routines is due to the fact that the vacuum cleaner
246  *              is invoked via an SQL command, and so is already executing inside
247  *              a transaction.  We need to leave ourselves in a predictable state
248  *              on entry and exit to the vacuum cleaner.  We commit the transaction
249  *              started in PostgresMain() inside vacuum_init(), and start one in
250  *              vacuum_shutdown() to match the commit waiting for us back in
251  *              PostgresMain().
252  */
253 static void
254 vacuum_init(void)
255 {
256         /* matches the StartTransaction in PostgresMain() */
257         CommitTransactionCommand();
258 }
259
260 static void
261 vacuum_shutdown(void)
262 {
263         /* on entry, we are not in a transaction */
264
265         /*
266          * Flush the init file that relcache.c uses to save startup time. The
267          * next backend startup will rebuild the init file with up-to-date
268          * information from pg_class.  This lets the optimizer see the stats
269          * that we've collected for certain critical system indexes.  See
270          * relcache.c for more details.
271          *
272          * Ignore any failure to unlink the file, since it might not be there if
273          * no backend has been started since the last vacuum...
274          */
275         unlink(RELCACHE_INIT_FILENAME);
276
277         /* matches the CommitTransaction in PostgresMain() */
278         StartTransactionCommand();
279
280         /*
281          * Clean up working storage --- note we must do this after
282          * StartTransactionCommand, else we might be trying to delete the
283          * active context!
284          */
285         MemoryContextDelete(vac_context);
286         vac_context = NULL;
287 }
288
289 /*
290  * Build a list of VRelListData nodes for each relation to be processed
291  */
292 static VRelList
293 getrels(Name VacRelP, const char *stmttype)
294 {
295         Relation        rel;
296         TupleDesc       tupdesc;
297         HeapScanDesc scan;
298         HeapTuple       tuple;
299         VRelList        vrl,
300                                 cur;
301         Datum           d;
302         char       *rname;
303         char            rkind;
304         bool            n;
305         ScanKeyData key;
306
307         if (VacRelP)
308         {
309
310                 /*
311                  * we could use the cache here, but it is clearer to use scankeys
312                  * for both vacuum cases, bjm 2000/01/19
313                  */
314                 char       *nontemp_relname;
315
316                 /* We must re-map temp table names bjm 2000-04-06 */
317                 nontemp_relname = get_temp_rel_by_username(NameStr(*VacRelP));
318                 if (nontemp_relname == NULL)
319                         nontemp_relname = NameStr(*VacRelP);
320
321                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
322                                                            F_NAMEEQ,
323                                                            PointerGetDatum(nontemp_relname));
324         }
325         else
326         {
327                 /* find all relations listed in pg_class */
328                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
329                                                            F_CHAREQ, CharGetDatum('r'));
330         }
331
332         vrl = cur = (VRelList) NULL;
333
334         rel = heap_openr(RelationRelationName, AccessShareLock);
335         tupdesc = RelationGetDescr(rel);
336
337         scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
338
339         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
340         {
341                 d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
342                 rname = (char *) DatumGetName(d);
343
344                 d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
345                 rkind = DatumGetChar(d);
346
347                 if (rkind != RELKIND_RELATION)
348                 {
349                         elog(NOTICE, "%s: can not process indexes, views or special system tables",
350                                  stmttype);
351                         continue;
352                 }
353
354                 /* Make a relation list entry for this guy */
355                 if (vrl == (VRelList) NULL)
356                         vrl = cur = (VRelList)
357                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
358                 else
359                 {
360                         cur->vrl_next = (VRelList)
361                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
362                         cur = cur->vrl_next;
363                 }
364
365                 cur->vrl_relid = tuple->t_data->t_oid;
366                 cur->vrl_next = (VRelList) NULL;
367         }
368
369         heap_endscan(scan);
370         heap_close(rel, AccessShareLock);
371
372         if (vrl == NULL)
373                 elog(NOTICE, "%s: table not found", stmttype);
374
375         return vrl;
376 }
377
378 /*
379  *      vacuum_rel() -- vacuum one heap relation
380  *
381  *              This routine vacuums a single heap, cleans out its indices, and
382  *              updates its num_pages and num_tuples statistics.
383  *
384  *              Doing one heap at a time incurs extra overhead, since we need to
385  *              check that the heap exists again just before we vacuum it.      The
386  *              reason that we do this is so that vacuuming can be spread across
387  *              many small transactions.  Otherwise, two-phase locking would require
388  *              us to lock the entire database during one pass of the vacuum cleaner.
389  *
390  *              At entry and exit, we are not inside a transaction.
391  */
392 static void
393 vacuum_rel(Oid relid)
394 {
395         Relation        onerel;
396         LockRelId       onerelid;
397         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
398                                                                                  * clean indices */
399         VacPageListData fraged_pages;           /* List of pages with space enough
400                                                                                  * for re-using */
401         Relation   *Irel;
402         int32           nindices,
403                                 i;
404         VRelStats  *vacrelstats;
405         bool            reindex = false;
406         Oid                     toast_relid;
407
408         /* Begin a transaction for vacuuming this relation */
409         StartTransactionCommand();
410
411         /*
412          * Check for user-requested abort.      Note we want this to be inside a
413          * transaction, so xact.c doesn't issue useless NOTICE.
414          */
415         CHECK_FOR_INTERRUPTS();
416
417         /*
418          * Race condition -- if the pg_class tuple has gone away since the
419          * last time we saw it, we don't need to vacuum it.
420          */
421         if (!SearchSysCacheExists(RELOID,
422                                                           ObjectIdGetDatum(relid),
423                                                           0, 0, 0))
424         {
425                 CommitTransactionCommand();
426                 return;
427         }
428
429         /*
430          * Open the class, get an exclusive lock on it, and check permissions.
431          *
432          * Note we choose to treat permissions failure as a NOTICE and keep
433          * trying to vacuum the rest of the DB --- is this appropriate?
434          */
435         onerel = heap_open(relid, AccessExclusiveLock);
436
437         if (!pg_ownercheck(GetUserId(), RelationGetRelationName(onerel),
438                                            RELNAME))
439         {
440                 elog(NOTICE, "Skipping \"%s\" --- only table owner can VACUUM it",
441                          RelationGetRelationName(onerel));
442                 heap_close(onerel, AccessExclusiveLock);
443                 CommitTransactionCommand();
444                 return;
445         }
446
447         /*
448          * Get a session-level exclusive lock too.      This will protect our
449          * exclusive access to the relation across multiple transactions, so
450          * that we can vacuum the relation's TOAST table (if any) secure in
451          * the knowledge that no one is diddling the parent relation.
452          *
453          * NOTE: this cannot block, even if someone else is waiting for access,
454          * because the lock manager knows that both lock requests are from the
455          * same process.
456          */
457         onerelid = onerel->rd_lockInfo.lockRelId;
458         LockRelationForSession(&onerelid, AccessExclusiveLock);
459
460         /*
461          * Remember the relation's TOAST relation for later
462          */
463         toast_relid = onerel->rd_rel->reltoastrelid;
464
465         /*
466          * Set up statistics-gathering machinery.
467          */
468         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
469         vacrelstats->relid = relid;
470         vacrelstats->num_pages = 0;
471         vacrelstats->num_tuples = 0;
472         vacrelstats->hasindex = false;
473
474         GetXmaxRecent(&XmaxRecent);
475
476         /* scan it */
477         reindex = false;
478         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
479         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
480         if (IsIgnoringSystemIndexes() &&
481                 IsSystemRelationName(RelationGetRelationName(onerel)))
482                 reindex = true;
483
484         /* Now open indices */
485         nindices = 0;
486         Irel = (Relation *) NULL;
487         get_indices(onerel, &nindices, &Irel);
488         if (!Irel)
489                 reindex = false;
490         else if (!RelationGetForm(onerel)->relhasindex)
491                 reindex = true;
492         if (nindices > 0)
493                 vacrelstats->hasindex = true;
494         else
495                 vacrelstats->hasindex = false;
496
497 #ifdef NOT_USED
498         /*
499          * reindex in VACUUM is dangerous under WAL. ifdef out until it
500          * becomes safe.
501          */
502         if (reindex)
503         {
504                 for (i = 0; i < nindices; i++)
505                         index_close(Irel[i]);
506                 Irel = (Relation *) NULL;
507                 activate_indexes_of_a_table(relid, false);
508         }
509 #endif   /* NOT_USED */
510
511         /* Clean/scan index relation(s) */
512         if (Irel != (Relation *) NULL)
513         {
514                 if (vacuum_pages.num_pages > 0)
515                 {
516                         for (i = 0; i < nindices; i++)
517                                 vacuum_index(&vacuum_pages, Irel[i],
518                                                          vacrelstats->num_tuples, 0);
519                 }
520                 else
521                 {
522                         /* just scan indices to update statistic */
523                         for (i = 0; i < nindices; i++)
524                                 scan_index(Irel[i], vacrelstats->num_tuples);
525                 }
526         }
527
528         if (fraged_pages.num_pages > 0)
529         {
530                 /* Try to shrink heap */
531                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
532                                         nindices, Irel);
533         }
534         else
535         {
536                 if (Irel != (Relation *) NULL)
537                         close_indices(nindices, Irel);
538                 if (vacuum_pages.num_pages > 0)
539                 {
540                         /* Clean pages from vacuum_pages list */
541                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
542                 }
543                 else
544                 {
545
546                         /*
547                          * Flush dirty pages out to disk.  We must do this even if we
548                          * didn't do anything else, because we want to ensure that all
549                          * tuples have correct on-row commit status on disk (see
550                          * bufmgr.c's comments for FlushRelationBuffers()).
551                          */
552                         i = FlushRelationBuffers(onerel, vacrelstats->num_pages);
553                         if (i < 0)
554                                 elog(ERROR, "VACUUM (vacuum_rel): FlushRelationBuffers returned %d",
555                                          i);
556                 }
557         }
558 #ifdef NOT_USED
559         if (reindex)
560                 activate_indexes_of_a_table(relid, true);
561 #endif   /* NOT_USED */
562
563         /* all done with this class, but hold lock until commit */
564         heap_close(onerel, NoLock);
565
566         /* update statistics in pg_class */
567         vac_update_relstats(vacrelstats->relid, vacrelstats->num_pages,
568                                                 vacrelstats->num_tuples, vacrelstats->hasindex);
569
570         /*
571          * Complete the transaction and free all temporary memory used.
572          */
573         CommitTransactionCommand();
574
575         /*
576          * If the relation has a secondary toast one, vacuum that too while we
577          * still hold the session lock on the master table. We don't need to
578          * propagate "analyze" to it, because the toaster always uses
579          * hardcoded index access and statistics are totally unimportant for
580          * toast relations
581          */
582         if (toast_relid != InvalidOid)
583                 vacuum_rel(toast_relid);
584
585         /*
586          * Now release the session-level lock on the master table.
587          */
588         UnlockRelationForSession(&onerelid, AccessExclusiveLock);
589 }
590
591 /*
592  *      scan_heap() -- scan an open heap relation
593  *
594  *              This routine sets commit times, constructs vacuum_pages list of
595  *              empty/uninitialized pages and pages with dead tuples and
596  *              ~LP_USED line pointers, constructs fraged_pages list of pages
597  *              appropriate for purposes of shrinking and maintains statistics
598  *              on the number of live tuples in a heap.
599  */
600 static void
601 scan_heap(VRelStats *vacrelstats, Relation onerel,
602                   VacPageList vacuum_pages, VacPageList fraged_pages)
603 {
604         BlockNumber nblocks,
605                                 blkno;
606         ItemId          itemid;
607         Buffer          buf;
608         HeapTupleData tuple;
609         Page            page,
610                                 tempPage = NULL;
611         OffsetNumber offnum,
612                                 maxoff;
613         bool            pgchanged,
614                                 tupgone,
615                                 dobufrel,
616                                 notup;
617         char       *relname;
618         VacPage         vacpage,
619                                 vp;
620         long            num_tuples;
621         uint32          tups_vacuumed,
622                                 nkeep,
623                                 nunused,
624                                 ncrash,
625                                 empty_pages,
626                                 new_pages,
627                                 changed_pages,
628                                 empty_end_pages;
629         Size            free_size,
630                                 usable_free_size;
631         Size            min_tlen = MaxTupleSize;
632         Size            max_tlen = 0;
633         int32           i;
634         bool            do_shrinking = true;
635         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
636         int                     num_vtlinks = 0;
637         int                     free_vtlinks = 100;
638         struct rusage ru0;
639
640         getrusage(RUSAGE_SELF, &ru0);
641
642         relname = RelationGetRelationName(onerel);
643         elog(MESSAGE_LEVEL, "--Relation %s--", relname);
644
645         tups_vacuumed = num_tuples = nkeep = nunused = ncrash = empty_pages =
646                 new_pages = changed_pages = empty_end_pages = 0;
647         free_size = usable_free_size = 0;
648
649         nblocks = RelationGetNumberOfBlocks(onerel);
650
651         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
652         vacpage->offsets_used = 0;
653
654         for (blkno = 0; blkno < nblocks; blkno++)
655         {
656                 buf = ReadBuffer(onerel, blkno);
657                 page = BufferGetPage(buf);
658                 vacpage->blkno = blkno;
659                 vacpage->offsets_free = 0;
660
661                 if (PageIsNew(page))
662                 {
663                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
664                                  relname, blkno);
665                         PageInit(page, BufferGetPageSize(buf), 0);
666                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
667                         free_size += (vacpage->free - sizeof(ItemIdData));
668                         new_pages++;
669                         empty_end_pages++;
670                         reap_page(vacuum_pages, vacpage);
671                         WriteBuffer(buf);
672                         continue;
673                 }
674
675                 if (PageIsEmpty(page))
676                 {
677                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
678                         free_size += (vacpage->free - sizeof(ItemIdData));
679                         empty_pages++;
680                         empty_end_pages++;
681                         reap_page(vacuum_pages, vacpage);
682                         ReleaseBuffer(buf);
683                         continue;
684                 }
685
686                 pgchanged = false;
687                 notup = true;
688                 maxoff = PageGetMaxOffsetNumber(page);
689                 for (offnum = FirstOffsetNumber;
690                          offnum <= maxoff;
691                          offnum = OffsetNumberNext(offnum))
692                 {
693                         itemid = PageGetItemId(page, offnum);
694
695                         /*
696                          * Collect un-used items too - it's possible to have indices
697                          * pointing here after crash.
698                          */
699                         if (!ItemIdIsUsed(itemid))
700                         {
701                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
702                                 nunused++;
703                                 continue;
704                         }
705
706                         tuple.t_datamcxt = NULL;
707                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
708                         tuple.t_len = ItemIdGetLength(itemid);
709                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
710                         tupgone = false;
711
712                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
713                         {
714                                 if (tuple.t_data->t_infomask & HEAP_XMIN_INVALID)
715                                         tupgone = true;
716                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
717                                 {
718                                         if (TransactionIdDidCommit((TransactionId)
719                                                                                            tuple.t_data->t_cmin))
720                                         {
721                                                 tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
722                                                 pgchanged = true;
723                                                 tupgone = true;
724                                         }
725                                         else
726                                         {
727                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
728                                                 pgchanged = true;
729                                         }
730                                 }
731                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
732                                 {
733                                         if (!TransactionIdDidCommit((TransactionId)
734                                                                                                 tuple.t_data->t_cmin))
735                                         {
736                                                 tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
737                                                 pgchanged = true;
738                                                 tupgone = true;
739                                         }
740                                         else
741                                         {
742                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
743                                                 pgchanged = true;
744                                         }
745                                 }
746                                 else
747                                 {
748                                         if (TransactionIdDidAbort(tuple.t_data->t_xmin))
749                                                 tupgone = true;
750                                         else if (TransactionIdDidCommit(tuple.t_data->t_xmin))
751                                         {
752                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
753                                                 pgchanged = true;
754                                         }
755                                         else if (!TransactionIdIsInProgress(tuple.t_data->t_xmin))
756                                         {
757
758                                                 /*
759                                                  * Not Aborted, Not Committed, Not in Progress -
760                                                  * so it's from crashed process. - vadim 11/26/96
761                                                  */
762                                                 ncrash++;
763                                                 tupgone = true;
764                                         }
765                                         else
766                                         {
767                                                 elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
768                                                    relname, blkno, offnum, tuple.t_data->t_xmin);
769                                                 do_shrinking = false;
770                                         }
771                                 }
772                         }
773
774                         /*
775                          * here we are concerned about tuples with xmin committed and
776                          * xmax unknown or committed
777                          */
778                         if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED &&
779                                 !(tuple.t_data->t_infomask & HEAP_XMAX_INVALID))
780                         {
781                                 if (tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED)
782                                 {
783                                         if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
784                                         {
785                                                 tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
786                                                 tuple.t_data->t_infomask &=
787                                                         ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
788                                                 pgchanged = true;
789                                         }
790                                         else
791                                                 tupgone = true;
792                                 }
793                                 else if (TransactionIdDidAbort(tuple.t_data->t_xmax))
794                                 {
795                                         tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
796                                         pgchanged = true;
797                                 }
798                                 else if (TransactionIdDidCommit(tuple.t_data->t_xmax))
799                                 {
800                                         if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
801                                         {
802                                                 tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
803                                                 tuple.t_data->t_infomask &=
804                                                         ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
805                                                 pgchanged = true;
806                                         }
807                                         else
808                                                 tupgone = true;
809                                 }
810                                 else if (!TransactionIdIsInProgress(tuple.t_data->t_xmax))
811                                 {
812
813                                         /*
814                                          * Not Aborted, Not Committed, Not in Progress - so it
815                                          * from crashed process. - vadim 06/02/97
816                                          */
817                                         tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
818                                         tuple.t_data->t_infomask &=
819                                                 ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
820                                         pgchanged = true;
821                                 }
822                                 else
823                                 {
824                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
825                                                  relname, blkno, offnum, tuple.t_data->t_xmax);
826                                         do_shrinking = false;
827                                 }
828
829                                 /*
830                                  * If tuple is recently deleted then we must not remove it
831                                  * from relation.
832                                  */
833                                 if (tupgone && (tuple.t_data->t_infomask & HEAP_XMIN_INVALID) == 0 && tuple.t_data->t_xmax >= XmaxRecent)
834                                 {
835                                         tupgone = false;
836                                         nkeep++;
837                                         if (!(tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED))
838                                         {
839                                                 tuple.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
840                                                 pgchanged = true;
841                                         }
842
843                                         /*
844                                          * If we do shrinking and this tuple is updated one
845                                          * then remember it to construct updated tuple
846                                          * dependencies.
847                                          */
848                                         if (do_shrinking && !(ItemPointerEquals(&(tuple.t_self),
849                                                                                            &(tuple.t_data->t_ctid))))
850                                         {
851                                                 if (free_vtlinks == 0)
852                                                 {
853                                                         free_vtlinks = 1000;
854                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
855                                                                                    (free_vtlinks + num_vtlinks) *
856                                                                                                  sizeof(VTupleLinkData));
857                                                 }
858                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
859                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
860                                                 free_vtlinks--;
861                                                 num_vtlinks++;
862                                         }
863                                 }
864                         }
865
866                         /*
867                          * Other checks...
868                          */
869                         if (!OidIsValid(tuple.t_data->t_oid))
870                         {
871                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
872                                          relname, blkno, offnum, tupgone);
873                         }
874
875                         if (tupgone)
876                         {
877                                 ItemId          lpp;
878
879                                 /*
880                                  * Here we are building a temporary copy of the page with
881                                  * dead tuples removed.  Below we will apply
882                                  * PageRepairFragmentation to the copy, so that we can
883                                  * determine how much space will be available after
884                                  * removal of dead tuples.      But note we are NOT changing
885                                  * the real page yet...
886                                  */
887                                 if (tempPage == (Page) NULL)
888                                 {
889                                         Size            pageSize;
890
891                                         pageSize = PageGetPageSize(page);
892                                         tempPage = (Page) palloc(pageSize);
893                                         memmove(tempPage, page, pageSize);
894                                 }
895
896                                 /* mark it unused on the temp page */
897                                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
898                                 lpp->lp_flags &= ~LP_USED;
899
900                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
901                                 tups_vacuumed++;
902                         }
903                         else
904                         {
905                                 num_tuples++;
906                                 notup = false;
907                                 if (tuple.t_len < min_tlen)
908                                         min_tlen = tuple.t_len;
909                                 if (tuple.t_len > max_tlen)
910                                         max_tlen = tuple.t_len;
911                         }
912                 }
913
914                 if (pgchanged)
915                 {
916                         WriteBuffer(buf);
917                         dobufrel = false;
918                         changed_pages++;
919                 }
920                 else
921                         dobufrel = true;
922
923                 if (tempPage != (Page) NULL)
924                 {                                               /* Some tuples are gone */
925                         PageRepairFragmentation(tempPage, NULL);
926                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
927                         free_size += vacpage->free;
928                         reap_page(vacuum_pages, vacpage);
929                         pfree(tempPage);
930                         tempPage = (Page) NULL;
931                 }
932                 else if (vacpage->offsets_free > 0)
933                 {                                               /* there are only ~LP_USED line pointers */
934                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
935                         free_size += vacpage->free;
936                         reap_page(vacuum_pages, vacpage);
937                 }
938                 if (dobufrel)
939                         ReleaseBuffer(buf);
940                 if (notup)
941                         empty_end_pages++;
942                 else
943                         empty_end_pages = 0;
944         }
945
946         pfree(vacpage);
947
948         /* save stats in the rel list for use later */
949         vacrelstats->num_tuples = num_tuples;
950         vacrelstats->num_pages = nblocks;
951         if (num_tuples == 0)
952                 min_tlen = max_tlen = 0;
953         vacrelstats->min_tlen = min_tlen;
954         vacrelstats->max_tlen = max_tlen;
955
956         vacuum_pages->empty_end_pages = empty_end_pages;
957         fraged_pages->empty_end_pages = empty_end_pages;
958
959         /*
960          * Try to make fraged_pages keeping in mind that we can't use free
961          * space of "empty" end-pages and last page if it reaped.
962          */
963         if (do_shrinking && vacuum_pages->num_pages - empty_end_pages > 0)
964         {
965                 int                     nusf;           /* blocks usefull for re-using */
966
967                 nusf = vacuum_pages->num_pages - empty_end_pages;
968                 if ((vacuum_pages->pagedesc[nusf - 1])->blkno == nblocks - empty_end_pages - 1)
969                         nusf--;
970
971                 for (i = 0; i < nusf; i++)
972                 {
973                         vp = vacuum_pages->pagedesc[i];
974                         if (enough_space(vp, min_tlen))
975                         {
976                                 vpage_insert(fraged_pages, vp);
977                                 usable_free_size += vp->free;
978                         }
979                 }
980         }
981
982         if (usable_free_size > 0 && num_vtlinks > 0)
983         {
984                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
985                           vac_cmp_vtlinks);
986                 vacrelstats->vtlinks = vtlinks;
987                 vacrelstats->num_vtlinks = num_vtlinks;
988         }
989         else
990         {
991                 vacrelstats->vtlinks = NULL;
992                 vacrelstats->num_vtlinks = 0;
993                 pfree(vtlinks);
994         }
995
996         elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
997 Tup %lu: Vac %u, Keep/VTL %u/%u, Crash %u, UnUsed %u, MinLen %lu, MaxLen %lu; \
998 Re-using: Free/Avail. Space %lu/%lu; EndEmpty/Avail. Pages %u/%u. %s",
999                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1000                  new_pages, num_tuples, tups_vacuumed,
1001                  nkeep, vacrelstats->num_vtlinks, ncrash,
1002                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1003                  (unsigned long) free_size, (unsigned long) usable_free_size,
1004                  empty_end_pages, fraged_pages->num_pages,
1005                  show_rusage(&ru0));
1006
1007 }
1008
1009
1010 /*
1011  *      repair_frag() -- try to repair relation's fragmentation
1012  *
1013  *              This routine marks dead tuples as unused and tries re-use dead space
1014  *              by moving tuples (and inserting indices if needed). It constructs
1015  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indices
1016  *              for them after committing (in hack-manner - without losing locks
1017  *              and freeing memory!) current transaction. It truncates relation
1018  *              if some end-blocks are gone away.
1019  */
1020 static void
1021 repair_frag(VRelStats *vacrelstats, Relation onerel,
1022                         VacPageList vacuum_pages, VacPageList fraged_pages,
1023                         int nindices, Relation *Irel)
1024 {
1025         TransactionId myXID;
1026         CommandId       myCID;
1027         Buffer          buf,
1028                                 cur_buffer;
1029         int                     nblocks,
1030                                 blkno;
1031         Page            page,
1032                                 ToPage = NULL;
1033         OffsetNumber offnum,
1034                                 maxoff,
1035                                 newoff,
1036                                 max_offset;
1037         ItemId          itemid,
1038                                 newitemid;
1039         HeapTupleData tuple,
1040                                 newtup;
1041         TupleDesc       tupdesc;
1042         IndexInfo **indexInfo = NULL;
1043         Datum           idatum[INDEX_MAX_KEYS];
1044         char            inulls[INDEX_MAX_KEYS];
1045         InsertIndexResult iresult;
1046         VacPageListData Nvacpagelist;
1047         VacPage         cur_page = NULL,
1048                                 last_vacuum_page,
1049                                 vacpage,
1050                            *curpage;
1051         int                     cur_item = 0;
1052         int                     last_move_dest_block = -1,
1053                                 last_vacuum_block,
1054                                 i = 0;
1055         Size            tuple_len;
1056         int                     num_moved,
1057                                 num_fraged_pages,
1058                                 vacuumed_pages;
1059         int                     checked_moved,
1060                                 num_tuples,
1061                                 keep_tuples = 0;
1062         bool            isempty,
1063                                 dowrite,
1064                                 chain_tuple_moved;
1065         struct rusage ru0;
1066
1067         getrusage(RUSAGE_SELF, &ru0);
1068
1069         myXID = GetCurrentTransactionId();
1070         myCID = GetCurrentCommandId();
1071
1072         tupdesc = RelationGetDescr(onerel);
1073
1074         if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
1075                 indexInfo = get_index_desc(onerel, nindices, Irel);
1076
1077         Nvacpagelist.num_pages = 0;
1078         num_fraged_pages = fraged_pages->num_pages;
1079         Assert(vacuum_pages->num_pages > vacuum_pages->empty_end_pages);
1080         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1081         last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1082         last_vacuum_block = last_vacuum_page->blkno;
1083         cur_buffer = InvalidBuffer;
1084         num_moved = 0;
1085
1086         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1087         vacpage->offsets_used = vacpage->offsets_free = 0;
1088
1089         /*
1090          * Scan pages backwards from the last nonempty page, trying to move
1091          * tuples down to lower pages.  Quit when we reach a page that we have
1092          * moved any tuples onto.  Note that if a page is still in the
1093          * fraged_pages list (list of candidate move-target pages) when we
1094          * reach it, we will remove it from the list.  This ensures we never
1095          * move a tuple up to a higher page number.
1096          *
1097          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1098          * in order, and on fraged_pages being a subset of vacuum_pages.
1099          */
1100         nblocks = vacrelstats->num_pages;
1101         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1102                  blkno > last_move_dest_block;
1103                  blkno--)
1104         {
1105                 buf = ReadBuffer(onerel, blkno);
1106                 page = BufferGetPage(buf);
1107
1108                 vacpage->offsets_free = 0;
1109
1110                 isempty = PageIsEmpty(page);
1111
1112                 dowrite = false;
1113                 if (blkno == last_vacuum_block) /* it's reaped page */
1114                 {
1115                         if (last_vacuum_page->offsets_free > 0)         /* there are dead tuples */
1116                         {                                       /* on this page - clean */
1117                                 Assert(!isempty);
1118                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1119                                 vacuum_page(onerel, buf, last_vacuum_page);
1120                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1121                                 dowrite = true;
1122                         }
1123                         else
1124                                 Assert(isempty);
1125                         --vacuumed_pages;
1126                         if (vacuumed_pages > 0)
1127                         {
1128                                 /* get prev reaped page from vacuum_pages */
1129                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1130                                 last_vacuum_block = last_vacuum_page->blkno;
1131                         }
1132                         else
1133                         {
1134                                 last_vacuum_page = NULL;
1135                                 last_vacuum_block = -1;
1136                         }
1137                         if (num_fraged_pages > 0 &&
1138                                 fraged_pages->pagedesc[num_fraged_pages - 1]->blkno ==
1139                                 (BlockNumber) blkno)
1140                         {
1141                                 /* page is in fraged_pages too; remove it */
1142                                 --num_fraged_pages;
1143                         }
1144                         if (isempty)
1145                         {
1146                                 ReleaseBuffer(buf);
1147                                 continue;
1148                         }
1149                 }
1150                 else
1151                         Assert(!isempty);
1152
1153                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1154                                                                                  * off this page, yet */
1155                 vacpage->blkno = blkno;
1156                 maxoff = PageGetMaxOffsetNumber(page);
1157                 for (offnum = FirstOffsetNumber;
1158                          offnum <= maxoff;
1159                          offnum = OffsetNumberNext(offnum))
1160                 {
1161                         itemid = PageGetItemId(page, offnum);
1162
1163                         if (!ItemIdIsUsed(itemid))
1164                                 continue;
1165
1166                         tuple.t_datamcxt = NULL;
1167                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1168                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1169                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1170
1171                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1172                         {
1173                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1174                                         elog(ERROR, "Invalid XID in t_cmin");
1175                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1176                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1177
1178                                 /*
1179                                  * If this (chain) tuple is moved by me already then I
1180                                  * have to check is it in vacpage or not - i.e. is it
1181                                  * moved while cleaning this page or some previous one.
1182                                  */
1183                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1184                                 {
1185                                         if (keep_tuples == 0)
1186                                                 continue;
1187                                         if (chain_tuple_moved)          /* some chains was moved
1188                                                                                                  * while */
1189                                         {                       /* cleaning this page */
1190                                                 Assert(vacpage->offsets_free > 0);
1191                                                 for (i = 0; i < vacpage->offsets_free; i++)
1192                                                 {
1193                                                         if (vacpage->offsets[i] == offnum)
1194                                                                 break;
1195                                                 }
1196                                                 if (i >= vacpage->offsets_free) /* not found */
1197                                                 {
1198                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1199                                                         keep_tuples--;
1200                                                 }
1201                                         }
1202                                         else
1203                                         {
1204                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1205                                                 keep_tuples--;
1206                                         }
1207                                         continue;
1208                                 }
1209                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1210                         }
1211
1212                         /*
1213                          * If this tuple is in the chain of tuples created in updates
1214                          * by "recent" transactions then we have to move all chain of
1215                          * tuples to another places.
1216                          */
1217                         if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
1218                                  tuple.t_data->t_xmin >= XmaxRecent) ||
1219                                 (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1220                                  !(ItemPointerEquals(&(tuple.t_self), &(tuple.t_data->t_ctid)))))
1221                         {
1222                                 Buffer          Cbuf = buf;
1223                                 Page            Cpage;
1224                                 ItemId          Citemid;
1225                                 ItemPointerData Ctid;
1226                                 HeapTupleData tp = tuple;
1227                                 Size            tlen = tuple_len;
1228                                 VTupleMove      vtmove = (VTupleMove)
1229                                 palloc(100 * sizeof(VTupleMoveData));
1230                                 int                     num_vtmove = 0;
1231                                 int                     free_vtmove = 100;
1232                                 VacPage         to_vacpage = NULL;
1233                                 int                     to_item = 0;
1234                                 bool            freeCbuf = false;
1235                                 int                     ti;
1236
1237                                 if (vacrelstats->vtlinks == NULL)
1238                                         elog(ERROR, "No one parent tuple was found");
1239                                 if (cur_buffer != InvalidBuffer)
1240                                 {
1241                                         WriteBuffer(cur_buffer);
1242                                         cur_buffer = InvalidBuffer;
1243                                 }
1244
1245                                 /*
1246                                  * If this tuple is in the begin/middle of the chain then
1247                                  * we have to move to the end of chain.
1248                                  */
1249                                 while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1250                                 !(ItemPointerEquals(&(tp.t_self), &(tp.t_data->t_ctid))))
1251                                 {
1252                                         Ctid = tp.t_data->t_ctid;
1253                                         if (freeCbuf)
1254                                                 ReleaseBuffer(Cbuf);
1255                                         freeCbuf = true;
1256                                         Cbuf = ReadBuffer(onerel,
1257                                                                           ItemPointerGetBlockNumber(&Ctid));
1258                                         Cpage = BufferGetPage(Cbuf);
1259                                         Citemid = PageGetItemId(Cpage,
1260                                                                           ItemPointerGetOffsetNumber(&Ctid));
1261                                         if (!ItemIdIsUsed(Citemid))
1262                                         {
1263
1264                                                 /*
1265                                                  * This means that in the middle of chain there
1266                                                  * was tuple updated by older (than XmaxRecent)
1267                                                  * xaction and this tuple is already deleted by
1268                                                  * me. Actually, upper part of chain should be
1269                                                  * removed and seems that this should be handled
1270                                                  * in scan_heap(), but it's not implemented at the
1271                                                  * moment and so we just stop shrinking here.
1272                                                  */
1273                                                 ReleaseBuffer(Cbuf);
1274                                                 pfree(vtmove);
1275                                                 vtmove = NULL;
1276                                                 elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1277                                                 break;
1278                                         }
1279                                         tp.t_datamcxt = NULL;
1280                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1281                                         tp.t_self = Ctid;
1282                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1283                                 }
1284                                 if (vtmove == NULL)
1285                                         break;
1286                                 /* first, can chain be moved ? */
1287                                 for (;;)
1288                                 {
1289                                         if (to_vacpage == NULL ||
1290                                                 !enough_space(to_vacpage, tlen))
1291                                         {
1292
1293                                                 /*
1294                                                  * if to_vacpage no longer has enough free space
1295                                                  * to be useful, remove it from fraged_pages list
1296                                                  */
1297                                                 if (to_vacpage != NULL &&
1298                                                 !enough_space(to_vacpage, vacrelstats->min_tlen))
1299                                                 {
1300                                                         Assert(num_fraged_pages > to_item);
1301                                                         memmove(fraged_pages->pagedesc + to_item,
1302                                                                         fraged_pages->pagedesc + to_item + 1,
1303                                                                         sizeof(VacPage) * (num_fraged_pages - to_item - 1));
1304                                                         num_fraged_pages--;
1305                                                 }
1306                                                 for (i = 0; i < num_fraged_pages; i++)
1307                                                 {
1308                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1309                                                                 break;
1310                                                 }
1311
1312                                                 /* can't move item anywhere */
1313                                                 if (i == num_fraged_pages)
1314                                                 {
1315                                                         for (i = 0; i < num_vtmove; i++)
1316                                                         {
1317                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1318                                                                 (vtmove[i].vacpage->offsets_used)--;
1319                                                         }
1320                                                         num_vtmove = 0;
1321                                                         break;
1322                                                 }
1323                                                 to_item = i;
1324                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1325                                         }
1326                                         to_vacpage->free -= MAXALIGN(tlen);
1327                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1328                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1329                                         (to_vacpage->offsets_used)++;
1330                                         if (free_vtmove == 0)
1331                                         {
1332                                                 free_vtmove = 1000;
1333                                                 vtmove = (VTupleMove) repalloc(vtmove,
1334                                                                                          (free_vtmove + num_vtmove) *
1335                                                                                                  sizeof(VTupleMoveData));
1336                                         }
1337                                         vtmove[num_vtmove].tid = tp.t_self;
1338                                         vtmove[num_vtmove].vacpage = to_vacpage;
1339                                         if (to_vacpage->offsets_used == 1)
1340                                                 vtmove[num_vtmove].cleanVpd = true;
1341                                         else
1342                                                 vtmove[num_vtmove].cleanVpd = false;
1343                                         free_vtmove--;
1344                                         num_vtmove++;
1345
1346                                         /* All done ? */
1347                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1348                                                 tp.t_data->t_xmin < XmaxRecent)
1349                                                 break;
1350
1351                                         /* Well, try to find tuple with old row version */
1352                                         for (;;)
1353                                         {
1354                                                 Buffer          Pbuf;
1355                                                 Page            Ppage;
1356                                                 ItemId          Pitemid;
1357                                                 HeapTupleData Ptp;
1358                                                 VTupleLinkData vtld,
1359                                                                    *vtlp;
1360
1361                                                 vtld.new_tid = tp.t_self;
1362                                                 vtlp = (VTupleLink)
1363                                                         vac_find_eq((void *) (vacrelstats->vtlinks),
1364                                                                                 vacrelstats->num_vtlinks,
1365                                                                                 sizeof(VTupleLinkData),
1366                                                                                 (void *) &vtld,
1367                                                                                 vac_cmp_vtlinks);
1368                                                 if (vtlp == NULL)
1369                                                         elog(ERROR, "Parent tuple was not found");
1370                                                 tp.t_self = vtlp->this_tid;
1371                                                 Pbuf = ReadBuffer(onerel,
1372                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1373                                                 Ppage = BufferGetPage(Pbuf);
1374                                                 Pitemid = PageGetItemId(Ppage,
1375                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1376                                                 if (!ItemIdIsUsed(Pitemid))
1377                                                         elog(ERROR, "Parent itemid marked as unused");
1378                                                 Ptp.t_datamcxt = NULL;
1379                                                 Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1380                                                 Assert(ItemPointerEquals(&(vtld.new_tid),
1381                                                                                                  &(Ptp.t_data->t_ctid)));
1382
1383                                                 /*
1384                                                  * Read above about cases when
1385                                                  * !ItemIdIsUsed(Citemid) (child item is
1386                                                  * removed)... Due to the fact that at the moment
1387                                                  * we don't remove unuseful part of update-chain,
1388                                                  * it's possible to get too old parent row here.
1389                                                  * Like as in the case which caused this problem,
1390                                                  * we stop shrinking here. I could try to find
1391                                                  * real parent row but want not to do it because
1392                                                  * of real solution will be implemented anyway,
1393                                                  * latter, and we are too close to 6.5 release. -
1394                                                  * vadim 06/11/99
1395                                                  */
1396                                                 if (Ptp.t_data->t_xmax != tp.t_data->t_xmin)
1397                                                 {
1398                                                         if (freeCbuf)
1399                                                                 ReleaseBuffer(Cbuf);
1400                                                         freeCbuf = false;
1401                                                         ReleaseBuffer(Pbuf);
1402                                                         for (i = 0; i < num_vtmove; i++)
1403                                                         {
1404                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1405                                                                 (vtmove[i].vacpage->offsets_used)--;
1406                                                         }
1407                                                         num_vtmove = 0;
1408                                                         elog(NOTICE, "Too old parent tuple found - can't continue repair_frag");
1409                                                         break;
1410                                                 }
1411 #ifdef NOT_USED                                 /* I'm not sure that this will wotk
1412                                                                  * properly... */
1413
1414                                                 /*
1415                                                  * If this tuple is updated version of row and it
1416                                                  * was created by the same transaction then no one
1417                                                  * is interested in this tuple - mark it as
1418                                                  * removed.
1419                                                  */
1420                                                 if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
1421                                                         Ptp.t_data->t_xmin == Ptp.t_data->t_xmax)
1422                                                 {
1423                                                         TransactionIdStore(myXID,
1424                                                                 (TransactionId *) &(Ptp.t_data->t_cmin));
1425                                                         Ptp.t_data->t_infomask &=
1426                                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1427                                                         Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
1428                                                         WriteBuffer(Pbuf);
1429                                                         continue;
1430                                                 }
1431 #endif
1432                                                 tp.t_datamcxt = Ptp.t_datamcxt;
1433                                                 tp.t_data = Ptp.t_data;
1434                                                 tlen = tp.t_len = ItemIdGetLength(Pitemid);
1435                                                 if (freeCbuf)
1436                                                         ReleaseBuffer(Cbuf);
1437                                                 Cbuf = Pbuf;
1438                                                 freeCbuf = true;
1439                                                 break;
1440                                         }
1441                                         if (num_vtmove == 0)
1442                                                 break;
1443                                 }
1444                                 if (freeCbuf)
1445                                         ReleaseBuffer(Cbuf);
1446                                 if (num_vtmove == 0)    /* chain can't be moved */
1447                                 {
1448                                         pfree(vtmove);
1449                                         break;
1450                                 }
1451                                 ItemPointerSetInvalid(&Ctid);
1452                                 for (ti = 0; ti < num_vtmove; ti++)
1453                                 {
1454                                         VacPage         destvacpage = vtmove[ti].vacpage;
1455
1456                                         /* Get page to move from */
1457                                         tuple.t_self = vtmove[ti].tid;
1458                                         Cbuf = ReadBuffer(onerel,
1459                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1460
1461                                         /* Get page to move to */
1462                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1463
1464                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1465                                         if (cur_buffer != Cbuf)
1466                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1467
1468                                         ToPage = BufferGetPage(cur_buffer);
1469                                         Cpage = BufferGetPage(Cbuf);
1470
1471                                         Citemid = PageGetItemId(Cpage,
1472                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1473                                         tuple.t_datamcxt = NULL;
1474                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1475                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1476
1477                                         /*
1478                                          * make a copy of the source tuple, and then mark the
1479                                          * source tuple MOVED_OFF.
1480                                          */
1481                                         heap_copytuple_with_tuple(&tuple, &newtup);
1482
1483                                         RelationInvalidateHeapTuple(onerel, &tuple);
1484
1485                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1486                                         START_CRIT_SECTION();
1487
1488                                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1489                                         tuple.t_data->t_infomask &=
1490                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1491                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1492
1493                                         /*
1494                                          * If this page was not used before - clean it.
1495                                          *
1496                                          * NOTE: a nasty bug used to lurk here.  It is possible
1497                                          * for the source and destination pages to be the same
1498                                          * (since this tuple-chain member can be on a page
1499                                          * lower than the one we're currently processing in
1500                                          * the outer loop).  If that's true, then after
1501                                          * vacuum_page() the source tuple will have been
1502                                          * moved, and tuple.t_data will be pointing at
1503                                          * garbage.  Therefore we must do everything that uses
1504                                          * tuple.t_data BEFORE this step!!
1505                                          *
1506                                          * This path is different from the other callers of
1507                                          * vacuum_page, because we have already incremented
1508                                          * the vacpage's offsets_used field to account for the
1509                                          * tuple(s) we expect to move onto the page. Therefore
1510                                          * vacuum_page's check for offsets_used == 0 is wrong.
1511                                          * But since that's a good debugging check for all
1512                                          * other callers, we work around it here rather than
1513                                          * remove it.
1514                                          */
1515                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1516                                         {
1517                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1518
1519                                                 destvacpage->offsets_used = 0;
1520                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1521                                                 destvacpage->offsets_used = sv_offsets_used;
1522                                         }
1523
1524                                         /*
1525                                          * Update the state of the copied tuple, and store it
1526                                          * on the destination page.
1527                                          */
1528                                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1529                                         newtup.t_data->t_infomask &=
1530                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1531                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1532                                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1533                                                                                  InvalidOffsetNumber, LP_USED);
1534                                         if (newoff == InvalidOffsetNumber)
1535                                         {
1536                                                 elog(STOP, "moving chain: failed to add item with len = %lu to page %u",
1537                                                   (unsigned long) tuple_len, destvacpage->blkno);
1538                                         }
1539                                         newitemid = PageGetItemId(ToPage, newoff);
1540                                         pfree(newtup.t_data);
1541                                         newtup.t_datamcxt = NULL;
1542                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1543                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1544
1545                                         {
1546                                                 XLogRecPtr      recptr =
1547                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1548                                                                           cur_buffer, &newtup);
1549
1550                                                 if (Cbuf != cur_buffer)
1551                                                 {
1552                                                         PageSetLSN(Cpage, recptr);
1553                                                         PageSetSUI(Cpage, ThisStartUpID);
1554                                                 }
1555                                                 PageSetLSN(ToPage, recptr);
1556                                                 PageSetSUI(ToPage, ThisStartUpID);
1557                                         }
1558                                         END_CRIT_SECTION();
1559
1560                                         if (((int) destvacpage->blkno) > last_move_dest_block)
1561                                                 last_move_dest_block = destvacpage->blkno;
1562
1563                                         /*
1564                                          * Set new tuple's t_ctid pointing to itself for last
1565                                          * tuple in chain, and to next tuple in chain
1566                                          * otherwise.
1567                                          */
1568                                         if (!ItemPointerIsValid(&Ctid))
1569                                                 newtup.t_data->t_ctid = newtup.t_self;
1570                                         else
1571                                                 newtup.t_data->t_ctid = Ctid;
1572                                         Ctid = newtup.t_self;
1573
1574                                         num_moved++;
1575
1576                                         /*
1577                                          * Remember that we moved tuple from the current page
1578                                          * (corresponding index tuple will be cleaned).
1579                                          */
1580                                         if (Cbuf == buf)
1581                                                 vacpage->offsets[vacpage->offsets_free++] =
1582                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1583                                         else
1584                                                 keep_tuples++;
1585
1586                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1587                                         if (cur_buffer != Cbuf)
1588                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
1589
1590                                         if (Irel != (Relation *) NULL)
1591                                         {
1592
1593                                                 /*
1594                                                  * XXX using CurrentMemoryContext here means
1595                                                  * intra-vacuum memory leak for functional
1596                                                  * indexes. Should fix someday.
1597                                                  *
1598                                                  * XXX This code fails to handle partial indexes!
1599                                                  * Probably should change it to use
1600                                                  * ExecOpenIndices.
1601                                                  */
1602                                                 for (i = 0; i < nindices; i++)
1603                                                 {
1604                                                         FormIndexDatum(indexInfo[i],
1605                                                                                    &newtup,
1606                                                                                    tupdesc,
1607                                                                                    CurrentMemoryContext,
1608                                                                                    idatum,
1609                                                                                    inulls);
1610                                                         iresult = index_insert(Irel[i],
1611                                                                                                    idatum,
1612                                                                                                    inulls,
1613                                                                                                    &newtup.t_self,
1614                                                                                                    onerel);
1615                                                         if (iresult)
1616                                                                 pfree(iresult);
1617                                                 }
1618                                         }
1619                                         WriteBuffer(cur_buffer);
1620                                         WriteBuffer(Cbuf);
1621                                 }
1622                                 cur_buffer = InvalidBuffer;
1623                                 pfree(vtmove);
1624                                 chain_tuple_moved = true;
1625                                 continue;
1626                         }
1627
1628                         /* try to find new page for this tuple */
1629                         if (cur_buffer == InvalidBuffer ||
1630                                 !enough_space(cur_page, tuple_len))
1631                         {
1632                                 if (cur_buffer != InvalidBuffer)
1633                                 {
1634                                         WriteBuffer(cur_buffer);
1635                                         cur_buffer = InvalidBuffer;
1636
1637                                         /*
1638                                          * If previous target page is now too full to add *any*
1639                                          * tuple to it, remove it from fraged_pages.
1640                                          */
1641                                         if (!enough_space(cur_page, vacrelstats->min_tlen))
1642                                         {
1643                                                 Assert(num_fraged_pages > cur_item);
1644                                                 memmove(fraged_pages->pagedesc + cur_item,
1645                                                                 fraged_pages->pagedesc + cur_item + 1,
1646                                                                 sizeof(VacPage) * (num_fraged_pages - cur_item - 1));
1647                                                 num_fraged_pages--;
1648                                         }
1649                                 }
1650                                 for (i = 0; i < num_fraged_pages; i++)
1651                                 {
1652                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
1653                                                 break;
1654                                 }
1655                                 if (i == num_fraged_pages)
1656                                         break;          /* can't move item anywhere */
1657                                 cur_item = i;
1658                                 cur_page = fraged_pages->pagedesc[cur_item];
1659                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
1660                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1661                                 ToPage = BufferGetPage(cur_buffer);
1662                                 /* if this page was not used before - clean it */
1663                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
1664                                         vacuum_page(onerel, cur_buffer, cur_page);
1665                         }
1666                         else
1667                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1668
1669                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1670
1671                         /* copy tuple */
1672                         heap_copytuple_with_tuple(&tuple, &newtup);
1673
1674                         RelationInvalidateHeapTuple(onerel, &tuple);
1675
1676                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1677                         START_CRIT_SECTION();
1678
1679                         /*
1680                          * Mark new tuple as moved_in by vacuum and store vacuum XID
1681                          * in t_cmin !!!
1682                          */
1683                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1684                         newtup.t_data->t_infomask &=
1685                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1686                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1687
1688                         /* add tuple to the page */
1689                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1690                                                                  InvalidOffsetNumber, LP_USED);
1691                         if (newoff == InvalidOffsetNumber)
1692                         {
1693                                 elog(STOP, "\
1694 failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
1695                                          (unsigned long) tuple_len, cur_page->blkno, (unsigned long) cur_page->free,
1696                                          cur_page->offsets_used, cur_page->offsets_free);
1697                         }
1698                         newitemid = PageGetItemId(ToPage, newoff);
1699                         pfree(newtup.t_data);
1700                         newtup.t_datamcxt = NULL;
1701                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1702                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
1703                         newtup.t_self = newtup.t_data->t_ctid;
1704
1705                         /*
1706                          * Mark old tuple as moved_off by vacuum and store vacuum XID
1707                          * in t_cmin !!!
1708                          */
1709                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1710                         tuple.t_data->t_infomask &=
1711                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1712                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1713
1714                         {
1715                                 XLogRecPtr      recptr =
1716                                 log_heap_move(onerel, buf, tuple.t_self,
1717                                                           cur_buffer, &newtup);
1718
1719                                 PageSetLSN(page, recptr);
1720                                 PageSetSUI(page, ThisStartUpID);
1721                                 PageSetLSN(ToPage, recptr);
1722                                 PageSetSUI(ToPage, ThisStartUpID);
1723                         }
1724                         END_CRIT_SECTION();
1725
1726                         cur_page->offsets_used++;
1727                         num_moved++;
1728                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1729                         if (((int) cur_page->blkno) > last_move_dest_block)
1730                                 last_move_dest_block = cur_page->blkno;
1731
1732                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1733
1734                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1735                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1736
1737                         /* insert index' tuples if needed */
1738                         if (Irel != (Relation *) NULL)
1739                         {
1740
1741                                 /*
1742                                  * XXX using CurrentMemoryContext here means intra-vacuum
1743                                  * memory leak for functional indexes. Should fix someday.
1744                                  *
1745                                  * XXX This code fails to handle partial indexes! Probably
1746                                  * should change it to use ExecOpenIndices.
1747                                  */
1748                                 for (i = 0; i < nindices; i++)
1749                                 {
1750                                         FormIndexDatum(indexInfo[i],
1751                                                                    &newtup,
1752                                                                    tupdesc,
1753                                                                    CurrentMemoryContext,
1754                                                                    idatum,
1755                                                                    inulls);
1756                                         iresult = index_insert(Irel[i],
1757                                                                                    idatum,
1758                                                                                    inulls,
1759                                                                                    &newtup.t_self,
1760                                                                                    onerel);
1761                                         if (iresult)
1762                                                 pfree(iresult);
1763                                 }
1764                         }
1765
1766                 }                                               /* walk along page */
1767
1768                 if (offnum < maxoff && keep_tuples > 0)
1769                 {
1770                         OffsetNumber off;
1771
1772                         for (off = OffsetNumberNext(offnum);
1773                                  off <= maxoff;
1774                                  off = OffsetNumberNext(off))
1775                         {
1776                                 itemid = PageGetItemId(page, off);
1777                                 if (!ItemIdIsUsed(itemid))
1778                                         continue;
1779                                 tuple.t_datamcxt = NULL;
1780                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1781                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
1782                                         continue;
1783                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1784                                         elog(ERROR, "Invalid XID in t_cmin (4)");
1785                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1786                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
1787                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1788                                 {
1789                                         /* some chains was moved while */
1790                                         if (chain_tuple_moved)
1791                                         {                       /* cleaning this page */
1792                                                 Assert(vacpage->offsets_free > 0);
1793                                                 for (i = 0; i < vacpage->offsets_free; i++)
1794                                                 {
1795                                                         if (vacpage->offsets[i] == off)
1796                                                                 break;
1797                                                 }
1798                                                 if (i >= vacpage->offsets_free) /* not found */
1799                                                 {
1800                                                         vacpage->offsets[vacpage->offsets_free++] = off;
1801                                                         Assert(keep_tuples > 0);
1802                                                         keep_tuples--;
1803                                                 }
1804                                         }
1805                                         else
1806                                         {
1807                                                 vacpage->offsets[vacpage->offsets_free++] = off;
1808                                                 Assert(keep_tuples > 0);
1809                                                 keep_tuples--;
1810                                         }
1811                                 }
1812                         }
1813                 }
1814
1815                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
1816                 {
1817                         if (chain_tuple_moved)          /* else - they are ordered */
1818                         {
1819                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
1820                                           sizeof(OffsetNumber), vac_cmp_offno);
1821                         }
1822                         reap_page(&Nvacpagelist, vacpage);
1823                         WriteBuffer(buf);
1824                 }
1825                 else if (dowrite)
1826                         WriteBuffer(buf);
1827                 else
1828                         ReleaseBuffer(buf);
1829
1830                 if (offnum <= maxoff)
1831                         break;                          /* some item(s) left */
1832
1833         }                                                       /* walk along relation */
1834
1835         blkno++;                                        /* new number of blocks */
1836
1837         if (cur_buffer != InvalidBuffer)
1838         {
1839                 Assert(num_moved > 0);
1840                 WriteBuffer(cur_buffer);
1841         }
1842
1843         if (num_moved > 0)
1844         {
1845
1846                 /*
1847                  * We have to commit our tuple movings before we truncate the
1848                  * relation.  Ideally we should do Commit/StartTransactionCommand
1849                  * here, relying on the session-level table lock to protect our
1850                  * exclusive access to the relation.  However, that would require
1851                  * a lot of extra code to close and re-open the relation, indices,
1852                  * etc.  For now, a quick hack: record status of current
1853                  * transaction as committed, and continue.
1854                  */
1855                 RecordTransactionCommit();
1856         }
1857
1858         /*
1859          * Clean uncleaned reaped pages from vacuum_pages list list and set
1860          * xmin committed for inserted tuples
1861          */
1862         checked_moved = 0;
1863         for (i = 0, curpage = vacuum_pages->pagedesc; i < vacuumed_pages; i++, curpage++)
1864         {
1865                 Assert((*curpage)->blkno < (BlockNumber) blkno);
1866                 buf = ReadBuffer(onerel, (*curpage)->blkno);
1867                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1868                 page = BufferGetPage(buf);
1869                 if ((*curpage)->offsets_used == 0)              /* this page was not used */
1870                 {
1871                         if (!PageIsEmpty(page))
1872                                 vacuum_page(onerel, buf, *curpage);
1873                 }
1874                 else
1875 /* this page was used */
1876                 {
1877                         num_tuples = 0;
1878                         max_offset = PageGetMaxOffsetNumber(page);
1879                         for (newoff = FirstOffsetNumber;
1880                                  newoff <= max_offset;
1881                                  newoff = OffsetNumberNext(newoff))
1882                         {
1883                                 itemid = PageGetItemId(page, newoff);
1884                                 if (!ItemIdIsUsed(itemid))
1885                                         continue;
1886                                 tuple.t_datamcxt = NULL;
1887                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1888                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1889                                 {
1890                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
1891                                                 elog(ERROR, "Invalid XID in t_cmin (2)");
1892                                         if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1893                                         {
1894                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
1895                                                 num_tuples++;
1896                                         }
1897                                         else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1898                                                 tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
1899                                         else
1900                                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
1901                                 }
1902                         }
1903                         Assert((*curpage)->offsets_used == num_tuples);
1904                         checked_moved += num_tuples;
1905                 }
1906                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1907                 WriteBuffer(buf);
1908         }
1909         Assert(num_moved == checked_moved);
1910
1911         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. %s",
1912                  RelationGetRelationName(onerel),
1913                  nblocks, blkno, num_moved,
1914                  show_rusage(&ru0));
1915
1916         /*
1917          * Reflect the motion of system tuples to catalog cache here.
1918          */
1919         CommandCounterIncrement();
1920
1921         if (Nvacpagelist.num_pages > 0)
1922         {
1923                 /* vacuum indices again if needed */
1924                 if (Irel != (Relation *) NULL)
1925                 {
1926                         VacPage    *vpleft,
1927                                            *vpright,
1928                                                 vpsave;
1929
1930                         /* re-sort Nvacpagelist.pagedesc */
1931                         for (vpleft = Nvacpagelist.pagedesc,
1932                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
1933                                  vpleft < vpright; vpleft++, vpright--)
1934                         {
1935                                 vpsave = *vpleft;
1936                                 *vpleft = *vpright;
1937                                 *vpright = vpsave;
1938                         }
1939                         Assert(keep_tuples >= 0);
1940                         for (i = 0; i < nindices; i++)
1941                                 vacuum_index(&Nvacpagelist, Irel[i],
1942                                                          vacrelstats->num_tuples, keep_tuples);
1943                 }
1944
1945                 /* clean moved tuples from last page in Nvacpagelist list */
1946                 if (vacpage->blkno == (BlockNumber) (blkno - 1) &&
1947                         vacpage->offsets_free > 0)
1948                 {
1949                         OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
1950                         OffsetNumber *unused = unbuf;
1951                         int                     uncnt;
1952
1953                         buf = ReadBuffer(onerel, vacpage->blkno);
1954                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1955                         page = BufferGetPage(buf);
1956                         num_tuples = 0;
1957                         maxoff = PageGetMaxOffsetNumber(page);
1958                         for (offnum = FirstOffsetNumber;
1959                                  offnum <= maxoff;
1960                                  offnum = OffsetNumberNext(offnum))
1961                         {
1962                                 itemid = PageGetItemId(page, offnum);
1963                                 if (!ItemIdIsUsed(itemid))
1964                                         continue;
1965                                 tuple.t_datamcxt = NULL;
1966                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1967
1968                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1969                                 {
1970                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
1971                                                 elog(ERROR, "Invalid XID in t_cmin (3)");
1972                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1973                                         {
1974                                                 itemid->lp_flags &= ~LP_USED;
1975                                                 num_tuples++;
1976                                         }
1977                                         else
1978                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
1979                                 }
1980
1981                         }
1982                         Assert(vacpage->offsets_free == num_tuples);
1983                         START_CRIT_SECTION();
1984                         uncnt = PageRepairFragmentation(page, unused);
1985                         {
1986                                 XLogRecPtr      recptr;
1987
1988                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
1989                                                   (char *) (&(unused[uncnt])) - (char *) unused);
1990                                 PageSetLSN(page, recptr);
1991                                 PageSetSUI(page, ThisStartUpID);
1992                         }
1993                         END_CRIT_SECTION();
1994                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1995                         WriteBuffer(buf);
1996                 }
1997
1998                 /* now - free new list of reaped pages */
1999                 curpage = Nvacpagelist.pagedesc;
2000                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2001                         pfree(*curpage);
2002                 pfree(Nvacpagelist.pagedesc);
2003         }
2004
2005         /*
2006          * Flush dirty pages out to disk.  We do this unconditionally, even if
2007          * we don't need to truncate, because we want to ensure that all
2008          * tuples have correct on-row commit status on disk (see bufmgr.c's
2009          * comments for FlushRelationBuffers()).
2010          */
2011         i = FlushRelationBuffers(onerel, blkno);
2012         if (i < 0)
2013                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2014                          i);
2015
2016         /* truncate relation, if needed */
2017         if (blkno < nblocks)
2018         {
2019                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2020                 Assert(blkno >= 0);
2021                 vacrelstats->num_pages = blkno; /* set new number of blocks */
2022         }
2023
2024         if (Irel != (Relation *) NULL)          /* pfree index' allocations */
2025         {
2026                 close_indices(nindices, Irel);
2027                 pfree(indexInfo);
2028         }
2029
2030         pfree(vacpage);
2031         if (vacrelstats->vtlinks != NULL)
2032                 pfree(vacrelstats->vtlinks);
2033 }
2034
2035 /*
2036  *      vacuum_heap() -- free dead tuples
2037  *
2038  *              This routine marks dead tuples as unused and truncates relation
2039  *              if there are "empty" end-blocks.
2040  */
2041 static void
2042 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2043 {
2044         Buffer          buf;
2045         VacPage    *vacpage;
2046         long            nblocks;
2047         int                     i;
2048
2049         nblocks = vacuum_pages->num_pages;
2050         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2051
2052         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2053         {
2054                 if ((*vacpage)->offsets_free > 0)
2055                 {
2056                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2057                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2058                         vacuum_page(onerel, buf, *vacpage);
2059                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2060                         WriteBuffer(buf);
2061                 }
2062         }
2063
2064         /*
2065          * Flush dirty pages out to disk.  We do this unconditionally, even if
2066          * we don't need to truncate, because we want to ensure that all
2067          * tuples have correct on-row commit status on disk (see bufmgr.c's
2068          * comments for FlushRelationBuffers()).
2069          */
2070         Assert(vacrelstats->num_pages >= vacuum_pages->empty_end_pages);
2071         nblocks = vacrelstats->num_pages - vacuum_pages->empty_end_pages;
2072
2073         i = FlushRelationBuffers(onerel, nblocks);
2074         if (i < 0)
2075                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2076                          i);
2077
2078         /* truncate relation if there are some empty end-pages */
2079         if (vacuum_pages->empty_end_pages > 0)
2080         {
2081                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %lu --> %lu.",
2082                          RelationGetRelationName(onerel),
2083                          vacrelstats->num_pages, nblocks);
2084                 nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks);
2085                 Assert(nblocks >= 0);
2086                 vacrelstats->num_pages = nblocks;               /* set new number of
2087                                                                                                  * blocks */
2088         }
2089 }
2090
2091 /*
2092  *      vacuum_page() -- free dead tuples on a page
2093  *                                       and repair its fragmentation.
2094  */
2095 static void
2096 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2097 {
2098         OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
2099         OffsetNumber *unused = unbuf;
2100         int                     uncnt;
2101         Page            page = BufferGetPage(buffer);
2102         ItemId          itemid;
2103         int                     i;
2104
2105         /* There shouldn't be any tuples moved onto the page yet! */
2106         Assert(vacpage->offsets_used == 0);
2107
2108         START_CRIT_SECTION();
2109         for (i = 0; i < vacpage->offsets_free; i++)
2110         {
2111                 itemid = &(((PageHeader) page)->pd_linp[vacpage->offsets[i] - 1]);
2112                 itemid->lp_flags &= ~LP_USED;
2113         }
2114         uncnt = PageRepairFragmentation(page, unused);
2115         {
2116                 XLogRecPtr      recptr;
2117
2118                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2119                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2120                 PageSetLSN(page, recptr);
2121                 PageSetSUI(page, ThisStartUpID);
2122         }
2123         END_CRIT_SECTION();
2124 }
2125
2126 /*
2127  *      _scan_index() -- scan one index relation to update statistic.
2128  *
2129  */
2130 static void
2131 scan_index(Relation indrel, long num_tuples)
2132 {
2133         RetrieveIndexResult res;
2134         IndexScanDesc iscan;
2135         long            nitups;
2136         int                     nipages;
2137         struct rusage ru0;
2138
2139         getrusage(RUSAGE_SELF, &ru0);
2140
2141         /* walk through the entire index */
2142         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
2143         nitups = 0;
2144
2145         while ((res = index_getnext(iscan, ForwardScanDirection))
2146                    != (RetrieveIndexResult) NULL)
2147         {
2148                 nitups++;
2149                 pfree(res);
2150         }
2151
2152         index_endscan(iscan);
2153
2154         /* now update statistics in pg_class */
2155         nipages = RelationGetNumberOfBlocks(indrel);
2156         vac_update_relstats(RelationGetRelid(indrel), nipages, nitups, false);
2157
2158         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %lu. %s",
2159                  RelationGetRelationName(indrel), nipages, nitups,
2160                  show_rusage(&ru0));
2161
2162         if (nitups != num_tuples)
2163                 elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%lu) IS NOT THE SAME AS HEAP' (%lu).\
2164 \n\tRecreate the index.",
2165                          RelationGetRelationName(indrel), nitups, num_tuples);
2166
2167 }
2168
2169 /*
2170  *      vacuum_index() -- vacuum one index relation.
2171  *
2172  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2173  *              It's locked. Indrel is an index relation on the vacuumed heap.
2174  *              We don't set locks on the index relation here, since the indexed
2175  *              access methods support locking at different granularities.
2176  *              We let them handle it.
2177  *
2178  *              Finally, we arrange to update the index relation's statistics in
2179  *              pg_class.
2180  */
2181 static void
2182 vacuum_index(VacPageList vacpagelist, Relation indrel,
2183                          long num_tuples, int keep_tuples)
2184 {
2185         RetrieveIndexResult res;
2186         IndexScanDesc iscan;
2187         ItemPointer heapptr;
2188         int                     tups_vacuumed;
2189         long            num_index_tuples;
2190         int                     num_pages;
2191         VacPage         vp;
2192         struct rusage ru0;
2193
2194         getrusage(RUSAGE_SELF, &ru0);
2195
2196         /* walk through the entire index */
2197         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
2198         tups_vacuumed = 0;
2199         num_index_tuples = 0;
2200
2201         while ((res = index_getnext(iscan, ForwardScanDirection))
2202                    != (RetrieveIndexResult) NULL)
2203         {
2204                 heapptr = &res->heap_iptr;
2205
2206                 if ((vp = tid_reaped(heapptr, vacpagelist)) != (VacPage) NULL)
2207                 {
2208 #ifdef NOT_USED
2209                         elog(DEBUG, "<%x,%x> -> <%x,%x>",
2210                                  ItemPointerGetBlockNumber(&(res->index_iptr)),
2211                                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
2212                                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
2213                                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
2214 #endif
2215                         if (vp->offsets_free == 0)
2216                         {
2217                                 elog(NOTICE, "Index %s: pointer to EmptyPage (blk %u off %u) - fixing",
2218                                          RelationGetRelationName(indrel),
2219                                          vp->blkno, ItemPointerGetOffsetNumber(heapptr));
2220                         }
2221                         ++tups_vacuumed;
2222                         index_delete(indrel, &res->index_iptr);
2223                 }
2224                 else
2225                         num_index_tuples++;
2226
2227                 pfree(res);
2228         }
2229
2230         index_endscan(iscan);
2231
2232         /* now update statistics in pg_class */
2233         num_pages = RelationGetNumberOfBlocks(indrel);
2234         vac_update_relstats(RelationGetRelid(indrel),
2235                                                 num_pages, num_index_tuples, false);
2236
2237         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %lu: Deleted %u. %s",
2238                  RelationGetRelationName(indrel), num_pages,
2239                  num_index_tuples - keep_tuples, tups_vacuumed,
2240                  show_rusage(&ru0));
2241
2242         if (num_index_tuples != num_tuples + keep_tuples)
2243                 elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%lu) IS NOT THE SAME AS HEAP' (%lu).\
2244 \n\tRecreate the index.",
2245                   RelationGetRelationName(indrel), num_index_tuples, num_tuples);
2246
2247 }
2248
2249 /*
2250  *      tid_reaped() -- is a particular tid reaped?
2251  *
2252  *              vacpagelist->VacPage_array is sorted in right order.
2253  */
2254 static VacPage
2255 tid_reaped(ItemPointer itemptr, VacPageList vacpagelist)
2256 {
2257         OffsetNumber ioffno;
2258         OffsetNumber *voff;
2259         VacPage         vp,
2260                            *vpp;
2261         VacPageData vacpage;
2262
2263         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2264         ioffno = ItemPointerGetOffsetNumber(itemptr);
2265
2266         vp = &vacpage;
2267         vpp = (VacPage *) vac_find_eq((void *) (vacpagelist->pagedesc),
2268                                    vacpagelist->num_pages, sizeof(VacPage), (void *) &vp,
2269                                                                   vac_cmp_blk);
2270
2271         if (vpp == (VacPage *) NULL)
2272                 return (VacPage) NULL;
2273         vp = *vpp;
2274
2275         /* ok - we are on true page */
2276
2277         if (vp->offsets_free == 0)
2278         {                                                       /* this is EmptyPage !!! */
2279                 return vp;
2280         }
2281
2282         voff = (OffsetNumber *) vac_find_eq((void *) (vp->offsets),
2283                                 vp->offsets_free, sizeof(OffsetNumber), (void *) &ioffno,
2284                                                                                 vac_cmp_offno);
2285
2286         if (voff == (OffsetNumber *) NULL)
2287                 return (VacPage) NULL;
2288
2289         return vp;
2290
2291 }
2292
2293 /*
2294  *      vac_update_relstats() -- update statistics for one relation
2295  *
2296  *              Update the whole-relation statistics that are kept in its pg_class
2297  *              row.  There are additional stats that will be updated if we are
2298  *              doing VACUUM ANALYZE, but we always update these stats.
2299  *
2300  *              This routine works for both index and heap relation entries in
2301  *              pg_class.  We violate no-overwrite semantics here by storing new
2302  *              values for the statistics columns directly into the pg_class
2303  *              tuple that's already on the page.  The reason for this is that if
2304  *              we updated these tuples in the usual way, vacuuming pg_class itself
2305  *              wouldn't work very well --- by the time we got done with a vacuum
2306  *              cycle, most of the tuples in pg_class would've been obsoleted.
2307  *              Of course, this only works for fixed-size never-null columns, but
2308  *              these are.
2309  */
2310 void
2311 vac_update_relstats(Oid relid, long num_pages, double num_tuples,
2312                                         bool hasindex)
2313 {
2314         Relation        rd;
2315         HeapTupleData rtup;
2316         HeapTuple       ctup;
2317         Form_pg_class pgcform;
2318         Buffer          buffer;
2319
2320         /*
2321          * update number of tuples and number of pages in pg_class
2322          */
2323         rd = heap_openr(RelationRelationName, RowExclusiveLock);
2324
2325         ctup = SearchSysCache(RELOID,
2326                                                   ObjectIdGetDatum(relid),
2327                                                   0, 0, 0);
2328         if (!HeapTupleIsValid(ctup))
2329                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
2330                          relid);
2331
2332         /* get the buffer cache tuple */
2333         rtup.t_self = ctup->t_self;
2334         ReleaseSysCache(ctup);
2335         heap_fetch(rd, SnapshotNow, &rtup, &buffer);
2336
2337         /* overwrite the existing statistics in the tuple */
2338         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
2339         pgcform->reltuples = num_tuples;
2340         pgcform->relpages = num_pages;
2341         pgcform->relhasindex = hasindex;
2342
2343         /* invalidate the tuple in the cache and write the buffer */
2344         RelationInvalidateHeapTuple(rd, &rtup);
2345         WriteBuffer(buffer);
2346
2347         heap_close(rd, RowExclusiveLock);
2348 }
2349
2350 /*
2351  *      reap_page() -- save a page on the array of reaped pages.
2352  *
2353  *              As a side effect of the way that the vacuuming loop for a given
2354  *              relation works, higher pages come after lower pages in the array
2355  *              (and highest tid on a page is last).
2356  */
2357 static void
2358 reap_page(VacPageList vacpagelist, VacPage vacpage)
2359 {
2360         VacPage         newvacpage;
2361
2362         /* allocate a VacPageData entry */
2363         newvacpage = (VacPage) palloc(sizeof(VacPageData) + vacpage->offsets_free * sizeof(OffsetNumber));
2364
2365         /* fill it in */
2366         if (vacpage->offsets_free > 0)
2367                 memmove(newvacpage->offsets, vacpage->offsets, vacpage->offsets_free * sizeof(OffsetNumber));
2368         newvacpage->blkno = vacpage->blkno;
2369         newvacpage->free = vacpage->free;
2370         newvacpage->offsets_used = vacpage->offsets_used;
2371         newvacpage->offsets_free = vacpage->offsets_free;
2372
2373         /* insert this page into vacpagelist list */
2374         vpage_insert(vacpagelist, newvacpage);
2375
2376 }
2377
2378 static void
2379 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2380 {
2381 #define PG_NPAGEDESC 1024
2382
2383         /* allocate a VacPage entry if needed */
2384         if (vacpagelist->num_pages == 0)
2385         {
2386                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2387                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2388         }
2389         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2390         {
2391                 vacpagelist->num_allocated_pages *= 2;
2392                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2393         }
2394         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2395         (vacpagelist->num_pages)++;
2396
2397 }
2398
2399 static void *
2400 vac_find_eq(void *bot, int nelem, int size, void *elm,
2401                         int (*compar) (const void *, const void *))
2402 {
2403         int                     res;
2404         int                     last = nelem - 1;
2405         int                     celm = nelem / 2;
2406         bool            last_move,
2407                                 first_move;
2408
2409         last_move = first_move = true;
2410         for (;;)
2411         {
2412                 if (first_move == true)
2413                 {
2414                         res = compar(bot, elm);
2415                         if (res > 0)
2416                                 return NULL;
2417                         if (res == 0)
2418                                 return bot;
2419                         first_move = false;
2420                 }
2421                 if (last_move == true)
2422                 {
2423                         res = compar(elm, (void *) ((char *) bot + last * size));
2424                         if (res > 0)
2425                                 return NULL;
2426                         if (res == 0)
2427                                 return (void *) ((char *) bot + last * size);
2428                         last_move = false;
2429                 }
2430                 res = compar(elm, (void *) ((char *) bot + celm * size));
2431                 if (res == 0)
2432                         return (void *) ((char *) bot + celm * size);
2433                 if (res < 0)
2434                 {
2435                         if (celm == 0)
2436                                 return NULL;
2437                         last = celm - 1;
2438                         celm = celm / 2;
2439                         last_move = true;
2440                         continue;
2441                 }
2442
2443                 if (celm == last)
2444                         return NULL;
2445
2446                 last = last - celm - 1;
2447                 bot = (void *) ((char *) bot + (celm + 1) * size);
2448                 celm = (last + 1) / 2;
2449                 first_move = true;
2450         }
2451
2452 }
2453
2454 static int
2455 vac_cmp_blk(const void *left, const void *right)
2456 {
2457         BlockNumber lblk,
2458                                 rblk;
2459
2460         lblk = (*((VacPage *) left))->blkno;
2461         rblk = (*((VacPage *) right))->blkno;
2462
2463         if (lblk < rblk)
2464                 return -1;
2465         if (lblk == rblk)
2466                 return 0;
2467         return 1;
2468
2469 }
2470
2471 static int
2472 vac_cmp_offno(const void *left, const void *right)
2473 {
2474
2475         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2476                 return -1;
2477         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2478                 return 0;
2479         return 1;
2480
2481 }
2482
2483 static int
2484 vac_cmp_vtlinks(const void *left, const void *right)
2485 {
2486
2487         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2488                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2489                 return -1;
2490         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2491                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2492                 return 1;
2493         /* bi_hi-es are equal */
2494         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2495                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2496                 return -1;
2497         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2498                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2499                 return 1;
2500         /* bi_lo-es are equal */
2501         if (((VTupleLink) left)->new_tid.ip_posid <
2502                 ((VTupleLink) right)->new_tid.ip_posid)
2503                 return -1;
2504         if (((VTupleLink) left)->new_tid.ip_posid >
2505                 ((VTupleLink) right)->new_tid.ip_posid)
2506                 return 1;
2507         return 0;
2508
2509 }
2510
2511
2512 static void
2513 get_indices(Relation relation, int *nindices, Relation **Irel)
2514 {
2515         List       *indexoidlist,
2516                            *indexoidscan;
2517         int                     i;
2518
2519         indexoidlist = RelationGetIndexList(relation);
2520
2521         *nindices = length(indexoidlist);
2522
2523         if (*nindices > 0)
2524                 *Irel = (Relation *) palloc(*nindices * sizeof(Relation));
2525         else
2526                 *Irel = NULL;
2527
2528         i = 0;
2529         foreach(indexoidscan, indexoidlist)
2530         {
2531                 Oid                     indexoid = lfirsti(indexoidscan);
2532
2533                 (*Irel)[i] = index_open(indexoid);
2534                 i++;
2535         }
2536
2537         freeList(indexoidlist);
2538 }
2539
2540
2541 static void
2542 close_indices(int nindices, Relation *Irel)
2543 {
2544
2545         if (Irel == (Relation *) NULL)
2546                 return;
2547
2548         while (nindices--)
2549                 index_close(Irel[nindices]);
2550         pfree(Irel);
2551
2552 }
2553
2554
2555 /*
2556  * Obtain IndexInfo data for each index on the rel
2557  */
2558 static IndexInfo **
2559 get_index_desc(Relation onerel, int nindices, Relation *Irel)
2560 {
2561         IndexInfo **indexInfo;
2562         int                     i;
2563         HeapTuple       cachetuple;
2564
2565         indexInfo = (IndexInfo **) palloc(nindices * sizeof(IndexInfo *));
2566
2567         for (i = 0; i < nindices; i++)
2568         {
2569                 cachetuple = SearchSysCache(INDEXRELID,
2570                                                          ObjectIdGetDatum(RelationGetRelid(Irel[i])),
2571                                                                         0, 0, 0);
2572                 if (!HeapTupleIsValid(cachetuple))
2573                         elog(ERROR, "get_index_desc: index %u not found",
2574                                  RelationGetRelid(Irel[i]));
2575                 indexInfo[i] = BuildIndexInfo(cachetuple);
2576                 ReleaseSysCache(cachetuple);
2577         }
2578
2579         return indexInfo;
2580 }
2581
2582
2583 static bool
2584 enough_space(VacPage vacpage, Size len)
2585 {
2586
2587         len = MAXALIGN(len);
2588
2589         if (len > vacpage->free)
2590                 return false;
2591
2592         if (vacpage->offsets_used < vacpage->offsets_free)      /* there are free
2593                                                                                                                  * itemid(s) */
2594                 return true;                    /* and len <= free_space */
2595
2596         /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2597         if (len + MAXALIGN(sizeof(ItemIdData)) <= vacpage->free)
2598                 return true;
2599
2600         return false;
2601
2602 }
2603
2604
2605 /*
2606  * Compute elapsed time since ru0 usage snapshot, and format into
2607  * a displayable string.  Result is in a static string, which is
2608  * tacky, but no one ever claimed that the Postgres backend is
2609  * threadable...
2610  */
2611 static char *
2612 show_rusage(struct rusage * ru0)
2613 {
2614         static char result[64];
2615         struct rusage ru1;
2616
2617         getrusage(RUSAGE_SELF, &ru1);
2618
2619         if (ru1.ru_stime.tv_usec < ru0->ru_stime.tv_usec)
2620         {
2621                 ru1.ru_stime.tv_sec--;
2622                 ru1.ru_stime.tv_usec += 1000000;
2623         }
2624         if (ru1.ru_utime.tv_usec < ru0->ru_utime.tv_usec)
2625         {
2626                 ru1.ru_utime.tv_sec--;
2627                 ru1.ru_utime.tv_usec += 1000000;
2628         }
2629
2630         snprintf(result, sizeof(result),
2631                          "CPU %d.%02ds/%d.%02du sec.",
2632                          (int) (ru1.ru_stime.tv_sec - ru0->ru_stime.tv_sec),
2633                          (int) (ru1.ru_stime.tv_usec - ru0->ru_stime.tv_usec) / 10000,
2634                          (int) (ru1.ru_utime.tv_sec - ru0->ru_utime.tv_sec),
2635                    (int) (ru1.ru_utime.tv_usec - ru0->ru_utime.tv_usec) / 10000);
2636
2637         return result;
2638 }