]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Cleanup of code for creating index entries. Functional indexes with
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        the postgres vacuum cleaner
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.163 2000/07/14 22:17:42 tgl Exp $
12  *
13
14  *-------------------------------------------------------------------------
15  */
16 #include <sys/types.h>
17 #include <sys/file.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20 #include <unistd.h>
21
22 #include "postgres.h"
23
24 #include "access/genam.h"
25 #include "access/heapam.h"
26 #include "catalog/catalog.h"
27 #include "catalog/catname.h"
28 #include "catalog/index.h"
29 #include "commands/vacuum.h"
30 #include "miscadmin.h"
31 #include "nodes/execnodes.h"
32 #include "storage/sinval.h"
33 #include "storage/smgr.h"
34 #include "tcop/tcopprot.h"
35 #include "utils/acl.h"
36 #include "utils/builtins.h"
37 #include "utils/fmgroids.h"
38 #include "utils/inval.h"
39 #include "utils/relcache.h"
40 #include "utils/syscache.h"
41 #include "utils/temprel.h"
42
43 #ifndef HAVE_GETRUSAGE
44 #include "rusagestub.h"
45 #else
46 #include <sys/time.h>
47 #include <sys/resource.h>
48 #endif
49
50
51 static MemoryContext vac_context = NULL;
52
53 static int      MESSAGE_LEVEL;          /* message level */
54
55 static TransactionId XmaxRecent;
56
57 /* non-export function prototypes */
58 static void vacuum_init(void);
59 static void vacuum_shutdown(void);
60 static void vac_vacuum(NameData *VacRelP, bool analyze, List *anal_cols2);
61 static VRelList getrels(NameData *VacRelP);
62 static void vacuum_rel(Oid relid, bool analyze, bool is_toastrel);
63 static void scan_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages);
64 static void repair_frag(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages, int nindices, Relation *Irel);
65 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacpagelist);
66 static void vacuum_page(Page page, VacPage vacpage);
67 static void vacuum_index(VacPageList vacpagelist, Relation indrel, int num_tuples, int keep_tuples);
68 static void scan_index(Relation indrel, int num_tuples);
69 static void update_relstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *vacrelstats);
70 static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist);
71 static void reap_page(VacPageList vacpagelist, VacPage vacpage);
72 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
73 static void get_indices(Relation relation, int *nindices, Relation **Irel);
74 static void close_indices(int nindices, Relation *Irel);
75 static IndexInfo **get_index_desc(Relation onerel, int nindices,
76                                                                   Relation *Irel);
77 static void *vac_find_eq(void *bot, int nelem, int size, void *elm,
78                          int (*compar) (const void *, const void *));
79 static int      vac_cmp_blk(const void *left, const void *right);
80 static int      vac_cmp_offno(const void *left, const void *right);
81 static int      vac_cmp_vtlinks(const void *left, const void *right);
82 static bool enough_space(VacPage vacpage, Size len);
83 static char *show_rusage(struct rusage * ru0);
84
85
86 void
87 vacuum(char *vacrel, bool verbose, bool analyze, List *anal_cols)
88 {
89         NameData        VacRel;
90         Name            VacRelName;
91         MemoryContext old;
92         List       *le;
93         List       *anal_cols2 = NIL;
94
95         if (anal_cols != NIL && !analyze)
96                 elog(ERROR, "Can't vacuum columns, only tables.  You can 'vacuum analyze' columns.");
97
98         /*
99          * We cannot run VACUUM inside a user transaction block; if we were
100          * inside a transaction, then our commit- and
101          * start-transaction-command calls would not have the intended effect!
102          * Furthermore, the forced commit that occurs before truncating the
103          * relation's file would have the effect of committing the rest of the
104          * user's transaction too, which would certainly not be the desired
105          * behavior.
106          */
107         if (IsTransactionBlock())
108                 elog(ERROR, "VACUUM cannot run inside a BEGIN/END block");
109
110         if (verbose)
111                 MESSAGE_LEVEL = NOTICE;
112         else
113                 MESSAGE_LEVEL = DEBUG;
114
115         /*
116          * Create special memory context for cross-transaction storage.
117          *
118          * Since it is a child of QueryContext, it will go away eventually
119          * even if we suffer an error; there's no need for special abort
120          * cleanup logic.
121          */
122         vac_context = AllocSetContextCreate(QueryContext,
123                                                                                 "Vacuum",
124                                                                                 ALLOCSET_DEFAULT_MINSIZE,
125                                                                                 ALLOCSET_DEFAULT_INITSIZE,
126                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
127
128         /* vacrel gets de-allocated on xact commit, so copy it to safe storage */
129         if (vacrel)
130         {
131                 namestrcpy(&VacRel, vacrel);
132                 VacRelName = &VacRel;
133         }
134         else
135                 VacRelName = NULL;
136
137         /* must also copy the column list, if any, to safe storage */
138         old = MemoryContextSwitchTo(vac_context);
139         foreach(le, anal_cols)
140         {
141                 char       *col = (char *) lfirst(le);
142
143                 anal_cols2 = lappend(anal_cols2, pstrdup(col));
144         }
145         MemoryContextSwitchTo(old);
146
147         /*
148          * Start up the vacuum cleaner.
149          *
150          * NOTE: since this commits the current transaction, the memory holding
151          * any passed-in parameters gets freed here.  We must have already
152          * copied pass-by-reference parameters to safe storage.  Don't make me
153          * fix this again!
154          */
155         vacuum_init();
156
157         /* vacuum the database */
158         vac_vacuum(VacRelName, analyze, anal_cols2);
159
160         /* clean up */
161         vacuum_shutdown();
162 }
163
164 /*
165  *      vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
166  *
167  *              Formerly, there was code here to prevent more than one VACUUM from
168  *              executing concurrently in the same database.  However, there's no
169  *              good reason to prevent that, and manually removing lockfiles after
170  *              a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
171  *              and just rely on the exclusive lock we grab on each target table
172  *              to ensure that there aren't two VACUUMs running on the same table
173  *              at the same time.
174  *
175  *              The strangeness with committing and starting transactions in the
176  *              init and shutdown routines is due to the fact that the vacuum cleaner
177  *              is invoked via an SQL command, and so is already executing inside
178  *              a transaction.  We need to leave ourselves in a predictable state
179  *              on entry and exit to the vacuum cleaner.  We commit the transaction
180  *              started in PostgresMain() inside vacuum_init(), and start one in
181  *              vacuum_shutdown() to match the commit waiting for us back in
182  *              PostgresMain().
183  */
184 static void
185 vacuum_init()
186 {
187         /* matches the StartTransaction in PostgresMain() */
188         CommitTransactionCommand();
189 }
190
191 static void
192 vacuum_shutdown()
193 {
194         /* on entry, we are not in a transaction */
195
196         /*
197          * Flush the init file that relcache.c uses to save startup time. The
198          * next backend startup will rebuild the init file with up-to-date
199          * information from pg_class.  This lets the optimizer see the stats
200          * that we've collected for certain critical system indexes.  See
201          * relcache.c for more details.
202          *
203          * Ignore any failure to unlink the file, since it might not be there if
204          * no backend has been started since the last vacuum...
205          */
206         unlink(RELCACHE_INIT_FILENAME);
207
208         /* matches the CommitTransaction in PostgresMain() */
209         StartTransactionCommand();
210
211         /*
212          * Clean up working storage --- note we must do this after
213          * StartTransactionCommand, else we might be trying to delete
214          * the active context!
215          */
216         MemoryContextDelete(vac_context);
217         vac_context = NULL;
218 }
219
220 /*
221  *      vac_vacuum() -- vacuum the database.
222  *
223  *              This routine builds a list of relations to vacuum, and then calls
224  *              code that vacuums them one at a time.  We are careful to vacuum each
225  *              relation in a separate transaction in order to avoid holding too many
226  *              locks at one time.
227  */
228 static void
229 vac_vacuum(NameData *VacRelP, bool analyze, List *anal_cols2)
230 {
231         VRelList        vrl,
232                                 cur;
233
234         /* get list of relations */
235         vrl = getrels(VacRelP);
236
237         /* vacuum each heap relation */
238         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
239         {
240                 vacuum_rel(cur->vrl_relid, analyze, false);
241                 /* analyze separately so locking is minimized */
242                 if (analyze)
243                         analyze_rel(cur->vrl_relid, anal_cols2, MESSAGE_LEVEL);
244         }
245 }
246
247 static VRelList
248 getrels(NameData *VacRelP)
249 {
250         Relation        rel;
251         TupleDesc       tupdesc;
252         HeapScanDesc scan;
253         HeapTuple       tuple;
254         VRelList        vrl,
255                                 cur;
256         Datum           d;
257         char       *rname;
258         char            rkind;
259         bool            n;
260         bool            found = false;
261         ScanKeyData key;
262
263         StartTransactionCommand();
264
265         if (NameStr(*VacRelP))
266         {
267
268                 /*
269                  * we could use the cache here, but it is clearer to use scankeys
270                  * for both vacuum cases, bjm 2000/01/19
271                  */
272                 char       *nontemp_relname;
273
274                 /* We must re-map temp table names bjm 2000-04-06 */
275                 if ((nontemp_relname =
276                          get_temp_rel_by_username(NameStr(*VacRelP))) == NULL)
277                         nontemp_relname = NameStr(*VacRelP);
278
279                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
280                                                            F_NAMEEQ,
281                                                            PointerGetDatum(nontemp_relname));
282         }
283         else
284         {
285                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
286                                                            F_CHAREQ, CharGetDatum('r'));
287         }
288
289         vrl = cur = (VRelList) NULL;
290
291         rel = heap_openr(RelationRelationName, AccessShareLock);
292         tupdesc = RelationGetDescr(rel);
293
294         scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
295
296         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
297         {
298                 found = true;
299
300                 d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
301                 rname = (char *) d;
302
303                 d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
304
305                 rkind = DatumGetChar(d);
306
307                 if (rkind != RELKIND_RELATION)
308                 {
309                         elog(NOTICE, "Vacuum: can not process index and certain system tables");
310                         continue;
311                 }
312
313                 /* get a relation list entry for this guy */
314                 if (vrl == (VRelList) NULL)
315                         vrl = cur = (VRelList)
316                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
317                 else
318                 {
319                         cur->vrl_next = (VRelList)
320                                 MemoryContextAlloc(vac_context, sizeof(VRelListData));
321                         cur = cur->vrl_next;
322                 }
323
324                 cur->vrl_relid = tuple->t_data->t_oid;
325                 cur->vrl_next = (VRelList) NULL;
326         }
327
328         heap_endscan(scan);
329         heap_close(rel, AccessShareLock);
330
331         if (!found)
332                 elog(NOTICE, "Vacuum: table not found");
333
334         CommitTransactionCommand();
335
336         return vrl;
337 }
338
339 /*
340  *      vacuum_rel() -- vacuum one heap relation
341  *
342  *              This routine vacuums a single heap, cleans out its indices, and
343  *              updates its statistics num_pages and num_tuples statistics.
344  *
345  *              Doing one heap at a time incurs extra overhead, since we need to
346  *              check that the heap exists again just before we vacuum it.      The
347  *              reason that we do this is so that vacuuming can be spread across
348  *              many small transactions.  Otherwise, two-phase locking would require
349  *              us to lock the entire database during one pass of the vacuum cleaner.
350  */
351 static void
352 vacuum_rel(Oid relid, bool analyze, bool is_toastrel)
353 {
354         HeapTuple       tuple;
355         Relation        onerel;
356         VacPageListData vacuum_pages; /* List of pages to vacuum and/or clean
357                                                                  * indices */
358         VacPageListData fraged_pages; /* List of pages with space enough for
359                                                                  * re-using */
360         VacPage    *vacpage;
361         Relation   *Irel;
362         int32           nindices,
363                                 i;
364         VRelStats  *vacrelstats;
365         bool            reindex = false;
366         Oid                     toast_relid;
367
368         if (!is_toastrel)
369                 StartTransactionCommand();
370
371         /*
372          * Check for user-requested abort.      Note we want this to be inside a
373          * transaction, so xact.c doesn't issue useless NOTICE.
374          */
375         if (QueryCancel)
376                 CancelQuery();
377
378         /*
379          * Race condition -- if the pg_class tuple has gone away since the
380          * last time we saw it, we don't need to vacuum it.
381          */
382         tuple = SearchSysCacheTuple(RELOID,
383                                                                 ObjectIdGetDatum(relid),
384                                                                 0, 0, 0);
385         if (!HeapTupleIsValid(tuple))
386         {
387                 if (!is_toastrel)
388                         CommitTransactionCommand();
389                 return;
390         }
391
392         /*
393          * Open the class, get an exclusive lock on it, and check permissions.
394          *
395          * Note we choose to treat permissions failure as a NOTICE and keep
396          * trying to vacuum the rest of the DB --- is this appropriate?
397          */
398         onerel = heap_open(relid, AccessExclusiveLock);
399
400         /*
401          * Remember the relations TOAST relation for later
402          *
403          */
404         toast_relid = onerel->rd_rel->reltoastrelid;
405
406 #ifndef NO_SECURITY
407         if (!pg_ownercheck(GetPgUserName(), RelationGetRelationName(onerel),
408                                            RELNAME))
409         {
410                 elog(NOTICE, "Skipping \"%s\" --- only table owner can VACUUM it",
411                          RelationGetRelationName(onerel));
412                 heap_close(onerel, AccessExclusiveLock);
413                 if (!is_toastrel)
414                         CommitTransactionCommand();
415                 return;
416         }
417 #endif
418
419         /*
420          * Set up statistics-gathering machinery.
421          */
422         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
423         vacrelstats->relid = relid;
424         vacrelstats->num_pages = vacrelstats->num_tuples = 0;
425         vacrelstats->hasindex = false;
426
427         GetXmaxRecent(&XmaxRecent);
428
429         /* scan it */
430         reindex = false;
431         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
432         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
433         if (IsIgnoringSystemIndexes() && IsSystemRelationName(RelationGetRelationName(onerel)))
434                 reindex = true;
435
436         /* Now open indices */
437         nindices = 0;
438         Irel = (Relation *) NULL;
439         get_indices(onerel, &nindices, &Irel);
440         if (!Irel)
441                 reindex = false;
442         else if (!RelationGetForm(onerel)->relhasindex)
443                 reindex = true;
444         if (nindices > 0)
445                 vacrelstats->hasindex = true;
446         else
447                 vacrelstats->hasindex = false;
448         if (reindex)
449         {
450                 for (i = 0; i < nindices; i++)
451                         index_close(Irel[i]);
452                 Irel = (Relation *) NULL;
453                 activate_indexes_of_a_table(relid, false);
454         }
455
456         /* Clean/scan index relation(s) */
457         if (Irel != (Relation *) NULL)
458         {
459                 if (vacuum_pages.num_pages > 0)
460                 {
461                         for (i = 0; i < nindices; i++)
462                                 vacuum_index(&vacuum_pages, Irel[i], vacrelstats->num_tuples, 0);
463                 }
464                 else
465                 /* just scan indices to update statistic */
466                 {
467                         for (i = 0; i < nindices; i++)
468                                 scan_index(Irel[i], vacrelstats->num_tuples);
469                 }
470         }
471
472         if (fraged_pages.num_pages > 0) /* Try to shrink heap */
473                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages, nindices, Irel);
474         else
475         {
476                 if (Irel != (Relation *) NULL)
477                         close_indices(nindices, Irel);
478                 if (vacuum_pages.num_pages > 0)         /* Clean pages from
479                                                                                                  * vacuum_pages list */
480                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
481         }
482         if (reindex)
483                 activate_indexes_of_a_table(relid, true);
484
485         /* ok - free vacuum_pages list of reaped pages */
486         if (vacuum_pages.num_pages > 0)
487         {
488                 vacpage = vacuum_pages.pagedesc;
489                 for (i = 0; i < vacuum_pages.num_pages; i++, vacpage++)
490                         pfree(*vacpage);
491                 pfree(vacuum_pages.pagedesc);
492                 if (fraged_pages.num_pages > 0)
493                         pfree(fraged_pages.pagedesc);
494         }
495
496         /* all done with this class, but hold lock until commit */
497         heap_close(onerel, NoLock);
498
499         /* update statistics in pg_class */
500         update_relstats(vacrelstats->relid, vacrelstats->num_pages,
501                         vacrelstats->num_tuples, vacrelstats->hasindex, vacrelstats);
502
503         /* If the relation has a secondary toast one, vacuum that too
504          * while we still hold the lock on the master table. We don't
505          * need to propagate "analyze" to it, because the toaster
506          * allways uses hardcoded index access and statistics are
507          * totally unimportant for toast relations
508          */
509         if (toast_relid != InvalidOid)
510                 vacuum_rel(toast_relid, false, true);
511
512         /* next command frees attribute stats */
513         if (!is_toastrel)
514                 CommitTransactionCommand();
515 }
516
517 /*
518  *      scan_heap() -- scan an open heap relation
519  *
520  *              This routine sets commit times, constructs vacuum_pages list of
521  *              empty/uninitialized pages and pages with dead tuples and
522  *              ~LP_USED line pointers, constructs fraged_pages list of pages
523  *              appropriate for purposes of shrinking and maintains statistics
524  *              on the number of live tuples in a heap.
525  */
526 static void
527 scan_heap(VRelStats *vacrelstats, Relation onerel,
528                         VacPageList vacuum_pages, VacPageList fraged_pages)
529 {
530         BlockNumber nblocks,
531                                 blkno;
532         ItemId          itemid;
533         Buffer          buf;
534         HeapTupleData tuple;
535         Page            page,
536                                 tempPage = NULL;
537         OffsetNumber offnum,
538                                 maxoff;
539         bool            pgchanged,
540                                 tupgone,
541                                 dobufrel,
542                                 notup;
543         char       *relname;
544         VacPage         vacpage,
545                                 vp;
546         uint32          tups_vacuumed,
547                                 num_tuples,
548                                 nkeep,
549                                 nunused,
550                                 ncrash,
551                                 empty_pages,
552                                 new_pages,
553                                 changed_pages,
554                                 empty_end_pages;
555         Size            free_size,
556                                 usable_free_size;
557         Size            min_tlen = MaxTupleSize;
558         Size            max_tlen = 0;
559         int32           i;
560         bool            do_shrinking = true;
561         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
562         int                     num_vtlinks = 0;
563         int                     free_vtlinks = 100;
564         struct rusage ru0;
565
566         getrusage(RUSAGE_SELF, &ru0);
567
568         relname = RelationGetRelationName(onerel);
569         elog(MESSAGE_LEVEL, "--Relation %s--", relname);
570
571         tups_vacuumed = num_tuples = nkeep = nunused = ncrash = empty_pages =
572                 new_pages = changed_pages = empty_end_pages = 0;
573         free_size = usable_free_size = 0;
574
575         nblocks = RelationGetNumberOfBlocks(onerel);
576
577         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
578         vacpage->offsets_used = 0;
579
580         for (blkno = 0; blkno < nblocks; blkno++)
581         {
582                 buf = ReadBuffer(onerel, blkno);
583                 page = BufferGetPage(buf);
584                 vacpage->blkno = blkno;
585                 vacpage->offsets_free = 0;
586
587                 if (PageIsNew(page))
588                 {
589                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
590                                  relname, blkno);
591                         PageInit(page, BufferGetPageSize(buf), 0);
592                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
593                         free_size += (vacpage->free - sizeof(ItemIdData));
594                         new_pages++;
595                         empty_end_pages++;
596                         reap_page(vacuum_pages, vacpage);
597                         WriteBuffer(buf);
598                         continue;
599                 }
600
601                 if (PageIsEmpty(page))
602                 {
603                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
604                         free_size += (vacpage->free - sizeof(ItemIdData));
605                         empty_pages++;
606                         empty_end_pages++;
607                         reap_page(vacuum_pages, vacpage);
608                         ReleaseBuffer(buf);
609                         continue;
610                 }
611
612                 pgchanged = false;
613                 notup = true;
614                 maxoff = PageGetMaxOffsetNumber(page);
615                 for (offnum = FirstOffsetNumber;
616                          offnum <= maxoff;
617                          offnum = OffsetNumberNext(offnum))
618                 {
619                         itemid = PageGetItemId(page, offnum);
620
621                         /*
622                          * Collect un-used items too - it's possible to have indices
623                          * pointing here after crash.
624                          */
625                         if (!ItemIdIsUsed(itemid))
626                         {
627                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
628                                 nunused++;
629                                 continue;
630                         }
631
632                         tuple.t_datamcxt = NULL;
633                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
634                         tuple.t_len = ItemIdGetLength(itemid);
635                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
636                         tupgone = false;
637
638                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
639                         {
640                                 if (tuple.t_data->t_infomask & HEAP_XMIN_INVALID)
641                                         tupgone = true;
642                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
643                                 {
644                                         if (TransactionIdDidCommit((TransactionId)
645                                                                                            tuple.t_data->t_cmin))
646                                         {
647                                                 tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
648                                                 tupgone = true;
649                                         }
650                                         else
651                                         {
652                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
653                                                 pgchanged = true;
654                                         }
655                                 }
656                                 else if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
657                                 {
658                                         if (!TransactionIdDidCommit((TransactionId)
659                                                                                                 tuple.t_data->t_cmin))
660                                         {
661                                                 tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
662                                                 tupgone = true;
663                                         }
664                                         else
665                                         {
666                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
667                                                 pgchanged = true;
668                                         }
669                                 }
670                                 else
671                                 {
672                                         if (TransactionIdDidAbort(tuple.t_data->t_xmin))
673                                                 tupgone = true;
674                                         else if (TransactionIdDidCommit(tuple.t_data->t_xmin))
675                                         {
676                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
677                                                 pgchanged = true;
678                                         }
679                                         else if (!TransactionIdIsInProgress(tuple.t_data->t_xmin))
680                                         {
681
682                                                 /*
683                                                  * Not Aborted, Not Committed, Not in Progress -
684                                                  * so it's from crashed process. - vadim 11/26/96
685                                                  */
686                                                 ncrash++;
687                                                 tupgone = true;
688                                         }
689                                         else
690                                         {
691                                                 elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
692                                                    relname, blkno, offnum, tuple.t_data->t_xmin);
693                                                 do_shrinking = false;
694                                         }
695                                 }
696                         }
697
698                         /*
699                          * here we are concerned about tuples with xmin committed and
700                          * xmax unknown or committed
701                          */
702                         if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED &&
703                                 !(tuple.t_data->t_infomask & HEAP_XMAX_INVALID))
704                         {
705                                 if (tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED)
706                                 {
707                                         if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
708                                         {
709                                                 pgchanged = true;
710                                                 tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
711                                         }
712                                         else
713                                                 tupgone = true;
714                                 }
715                                 else if (TransactionIdDidAbort(tuple.t_data->t_xmax))
716                                 {
717                                         tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
718                                         pgchanged = true;
719                                 }
720                                 else if (TransactionIdDidCommit(tuple.t_data->t_xmax))
721                                 {
722                                         if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
723                                         {
724                                                 tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
725                                                 pgchanged = true;
726                                         }
727                                         else
728                                                 tupgone = true;
729                                 }
730                                 else if (!TransactionIdIsInProgress(tuple.t_data->t_xmax))
731                                 {
732
733                                         /*
734                                          * Not Aborted, Not Committed, Not in Progress - so it
735                                          * from crashed process. - vadim 06/02/97
736                                          */
737                                         tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
738                                         pgchanged = true;
739                                 }
740                                 else
741                                 {
742                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
743                                                  relname, blkno, offnum, tuple.t_data->t_xmax);
744                                         do_shrinking = false;
745                                 }
746
747                                 /*
748                                  * If tuple is recently deleted then we must not remove it
749                                  * from relation.
750                                  */
751                                 if (tupgone && (tuple.t_data->t_infomask & HEAP_XMIN_INVALID) == 0 && tuple.t_data->t_xmax >= XmaxRecent)
752                                 {
753                                         tupgone = false;
754                                         nkeep++;
755                                         if (!(tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED))
756                                         {
757                                                 tuple.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
758                                                 pgchanged = true;
759                                         }
760
761                                         /*
762                                          * If we do shrinking and this tuple is updated one
763                                          * then remember it to construct updated tuple
764                                          * dependencies.
765                                          */
766                                         if (do_shrinking && !(ItemPointerEquals(&(tuple.t_self),
767                                                                                            &(tuple.t_data->t_ctid))))
768                                         {
769                                                 if (free_vtlinks == 0)
770                                                 {
771                                                         free_vtlinks = 1000;
772                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
773                                                                                    (free_vtlinks + num_vtlinks) *
774                                                                                                  sizeof(VTupleLinkData));
775                                                 }
776                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
777                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
778                                                 free_vtlinks--;
779                                                 num_vtlinks++;
780                                         }
781                                 }
782                         }
783
784                         /*
785                          * Other checks...
786                          */
787                         if (!OidIsValid(tuple.t_data->t_oid))
788                         {
789                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
790                                          relname, blkno, offnum, tupgone);
791                         }
792
793                         if (tupgone)
794                         {
795                                 ItemId          lpp;
796
797                                 if (tempPage == (Page) NULL)
798                                 {
799                                         Size            pageSize;
800
801                                         pageSize = PageGetPageSize(page);
802                                         tempPage = (Page) palloc(pageSize);
803                                         memmove(tempPage, page, pageSize);
804                                 }
805
806                                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
807
808                                 /* mark it unused */
809                                 lpp->lp_flags &= ~LP_USED;
810
811                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
812                                 tups_vacuumed++;
813
814                         }
815                         else
816                         {
817                                 num_tuples++;
818                                 notup = false;
819                                 if (tuple.t_len < min_tlen)
820                                         min_tlen = tuple.t_len;
821                                 if (tuple.t_len > max_tlen)
822                                         max_tlen = tuple.t_len;
823                         }
824                 }
825
826                 if (pgchanged)
827                 {
828                         WriteBuffer(buf);
829                         dobufrel = false;
830                         changed_pages++;
831                 }
832                 else
833                         dobufrel = true;
834
835                 if (tempPage != (Page) NULL)
836                 {                                               /* Some tuples are gone */
837                         PageRepairFragmentation(tempPage);
838                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
839                         free_size += vacpage->free;
840                         reap_page(vacuum_pages, vacpage);
841                         pfree(tempPage);
842                         tempPage = (Page) NULL;
843                 }
844                 else if (vacpage->offsets_free > 0)
845                 {                                               /* there are only ~LP_USED line pointers */
846                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
847                         free_size += vacpage->free;
848                         reap_page(vacuum_pages, vacpage);
849                 }
850                 if (dobufrel)
851                         ReleaseBuffer(buf);
852                 if (notup)
853                         empty_end_pages++;
854                 else
855                         empty_end_pages = 0;
856         }
857
858         pfree(vacpage);
859
860         /* save stats in the rel list for use later */
861         vacrelstats->num_tuples = num_tuples;
862         vacrelstats->num_pages = nblocks;
863 /*        vacrelstats->natts = attr_cnt;*/
864         if (num_tuples == 0)
865                 min_tlen = max_tlen = 0;
866         vacrelstats->min_tlen = min_tlen;
867         vacrelstats->max_tlen = max_tlen;
868
869         vacuum_pages->empty_end_pages = empty_end_pages;
870         fraged_pages->empty_end_pages = empty_end_pages;
871
872         /*
873          * Try to make fraged_pages keeping in mind that we can't use free
874          * space of "empty" end-pages and last page if it reaped.
875          */
876         if (do_shrinking && vacuum_pages->num_pages - empty_end_pages > 0)
877         {
878                 int                     nusf;           /* blocks usefull for re-using */
879
880                 nusf = vacuum_pages->num_pages - empty_end_pages;
881                 if ((vacuum_pages->pagedesc[nusf - 1])->blkno == nblocks - empty_end_pages - 1)
882                         nusf--;
883
884                 for (i = 0; i < nusf; i++)
885                 {
886                         vp = vacuum_pages->pagedesc[i];
887                         if (enough_space(vp, min_tlen))
888                         {
889                                 vpage_insert(fraged_pages, vp);
890                                 usable_free_size += vp->free;
891                         }
892                 }
893         }
894
895         if (usable_free_size > 0 && num_vtlinks > 0)
896         {
897                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
898                           vac_cmp_vtlinks);
899                 vacrelstats->vtlinks = vtlinks;
900                 vacrelstats->num_vtlinks = num_vtlinks;
901         }
902         else
903         {
904                 vacrelstats->vtlinks = NULL;
905                 vacrelstats->num_vtlinks = 0;
906                 pfree(vtlinks);
907         }
908
909         elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
910 Tup %u: Vac %u, Keep/VTL %u/%u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; \
911 Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. %s",
912                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
913                  new_pages, num_tuples, tups_vacuumed,
914                  nkeep, vacrelstats->num_vtlinks, ncrash,
915                  nunused, min_tlen, max_tlen, free_size, usable_free_size,
916                  empty_end_pages, fraged_pages->num_pages,
917                  show_rusage(&ru0));
918
919 }
920
921
922 /*
923  *      repair_frag() -- try to repair relation's fragmentation
924  *
925  *              This routine marks dead tuples as unused and tries re-use dead space
926  *              by moving tuples (and inserting indices if needed). It constructs
927  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indices
928  *              for them after committing (in hack-manner - without losing locks
929  *              and freeing memory!) current transaction. It truncates relation
930  *              if some end-blocks are gone away.
931  */
932 static void
933 repair_frag(VRelStats *vacrelstats, Relation onerel,
934                            VacPageList vacuum_pages, VacPageList fraged_pages,
935                            int nindices, Relation *Irel)
936 {
937         TransactionId myXID;
938         CommandId       myCID;
939         Buffer          buf,
940                                 cur_buffer;
941         int                     nblocks,
942                                 blkno;
943         Page            page,
944                                 ToPage = NULL;
945         OffsetNumber offnum = 0,
946                                 maxoff = 0,
947                                 newoff,
948                                 max_offset;
949         ItemId          itemid,
950                                 newitemid;
951         HeapTupleData tuple,
952                                 newtup;
953         TupleDesc       tupdesc;
954         IndexInfo **indexInfo = NULL;
955         Datum           idatum[INDEX_MAX_KEYS];
956         char            inulls[INDEX_MAX_KEYS];
957         InsertIndexResult iresult;
958         VacPageListData Nvacpagelist;
959         VacPage         cur_page = NULL,
960                                 last_vacuum_page,
961                                 vacpage,
962                            *curpage;
963         int                     cur_item = 0;
964         int                     last_move_dest_block = -1,
965                                 last_vacuum_block,
966                                 i = 0;
967         Size            tuple_len;
968         int                     num_moved,
969                                 num_fraged_pages,
970                                 vacuumed_pages;
971         int                     checked_moved,
972                                 num_tuples,
973                                 keep_tuples = 0;
974         bool            isempty,
975                                 dowrite,
976                                 chain_tuple_moved;
977         struct rusage ru0;
978
979         getrusage(RUSAGE_SELF, &ru0);
980
981         myXID = GetCurrentTransactionId();
982         myCID = GetCurrentCommandId();
983
984         tupdesc = RelationGetDescr(onerel);
985
986         if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
987                 indexInfo = get_index_desc(onerel, nindices, Irel);
988
989         Nvacpagelist.num_pages = 0;
990         num_fraged_pages = fraged_pages->num_pages;
991         Assert(vacuum_pages->num_pages > vacuum_pages->empty_end_pages);
992         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
993         last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
994         last_vacuum_block = last_vacuum_page->blkno;
995         cur_buffer = InvalidBuffer;
996         num_moved = 0;
997
998         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
999         vacpage->offsets_used = vacpage->offsets_free = 0;
1000
1001         /*
1002          * Scan pages backwards from the last nonempty page, trying to move
1003          * tuples down to lower pages.  Quit when we reach a page that we have
1004          * moved any tuples onto.  Note that if a page is still in the
1005          * fraged_pages list (list of candidate move-target pages) when we
1006          * reach it, we will remove it from the list.  This ensures we never
1007          * move a tuple up to a higher page number.
1008          *
1009          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1010          * in order, and on fraged_pages being a subset of vacuum_pages.
1011          */
1012         nblocks = vacrelstats->num_pages;
1013         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1014                  blkno > last_move_dest_block;
1015                  blkno--)
1016         {
1017                 buf = ReadBuffer(onerel, blkno);
1018                 page = BufferGetPage(buf);
1019
1020                 vacpage->offsets_free = 0;
1021
1022                 isempty = PageIsEmpty(page);
1023
1024                 dowrite = false;
1025                 if (blkno == last_vacuum_block) /* it's reaped page */
1026                 {
1027                         if (last_vacuum_page->offsets_free > 0) /* there are dead tuples */
1028                         {                                       /* on this page - clean */
1029                                 Assert(!isempty);
1030                                 vacuum_page(page, last_vacuum_page);
1031                                 dowrite = true;
1032                         }
1033                         else
1034                                 Assert(isempty);
1035                         --vacuumed_pages;
1036                         if (vacuumed_pages > 0)
1037                         {
1038                                 /* get prev reaped page from vacuum_pages */
1039                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1040                                 last_vacuum_block = last_vacuum_page->blkno;
1041                         }
1042                         else
1043                         {
1044                                 last_vacuum_page = NULL;
1045                                 last_vacuum_block = -1;
1046                         }
1047                         if (num_fraged_pages > 0 &&
1048                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno ==
1049                                 (BlockNumber) blkno)
1050                         {
1051                                 /* page is in fraged_pages too; remove it */
1052                                 --num_fraged_pages;
1053                         }
1054                         if (isempty)
1055                         {
1056                                 ReleaseBuffer(buf);
1057                                 continue;
1058                         }
1059                 }
1060                 else
1061                         Assert(!isempty);
1062
1063                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1064                                                                                  * off this page, yet */
1065                 vacpage->blkno = blkno;
1066                 maxoff = PageGetMaxOffsetNumber(page);
1067                 for (offnum = FirstOffsetNumber;
1068                          offnum <= maxoff;
1069                          offnum = OffsetNumberNext(offnum))
1070                 {
1071                         itemid = PageGetItemId(page, offnum);
1072
1073                         if (!ItemIdIsUsed(itemid))
1074                                 continue;
1075
1076                         tuple.t_datamcxt = NULL;
1077                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1078                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1079                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1080
1081                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1082                         {
1083                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1084                                         elog(ERROR, "Invalid XID in t_cmin");
1085                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1086                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1087
1088                                 /*
1089                                  * If this (chain) tuple is moved by me already then I
1090                                  * have to check is it in vacpage or not - i.e. is it moved
1091                                  * while cleaning this page or some previous one.
1092                                  */
1093                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1094                                 {
1095                                         if (keep_tuples == 0)
1096                                                 continue;
1097                                         if (chain_tuple_moved)          /* some chains was moved
1098                                                                                                  * while */
1099                                         {                       /* cleaning this page */
1100                                                 Assert(vacpage->offsets_free > 0);
1101                                                 for (i = 0; i < vacpage->offsets_free; i++)
1102                                                 {
1103                                                         if (vacpage->offsets[i] == offnum)
1104                                                                 break;
1105                                                 }
1106                                                 if (i >= vacpage->offsets_free) /* not found */
1107                                                 {
1108                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1109                                                         keep_tuples--;
1110                                                 }
1111                                         }
1112                                         else
1113                                         {
1114                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1115                                                 keep_tuples--;
1116                                         }
1117                                         continue;
1118                                 }
1119                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1120                         }
1121
1122                         /*
1123                          * If this tuple is in the chain of tuples created in updates
1124                          * by "recent" transactions then we have to move all chain of
1125                          * tuples to another places.
1126                          */
1127                         if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
1128                                  tuple.t_data->t_xmin >= XmaxRecent) ||
1129                                 (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1130                                  !(ItemPointerEquals(&(tuple.t_self), &(tuple.t_data->t_ctid)))))
1131                         {
1132                                 Buffer          Cbuf = buf;
1133                                 Page            Cpage;
1134                                 ItemId          Citemid;
1135                                 ItemPointerData Ctid;
1136                                 HeapTupleData tp = tuple;
1137                                 Size            tlen = tuple_len;
1138                                 VTupleMove      vtmove = (VTupleMove)
1139                                 palloc(100 * sizeof(VTupleMoveData));
1140                                 int                     num_vtmove = 0;
1141                                 int                     free_vtmove = 100;
1142                                 VacPage         to_vacpage = NULL;
1143                                 int                     to_item = 0;
1144                                 bool            freeCbuf = false;
1145                                 int                     ti;
1146
1147                                 if (vacrelstats->vtlinks == NULL)
1148                                         elog(ERROR, "No one parent tuple was found");
1149                                 if (cur_buffer != InvalidBuffer)
1150                                 {
1151                                         WriteBuffer(cur_buffer);
1152                                         cur_buffer = InvalidBuffer;
1153                                 }
1154
1155                                 /*
1156                                  * If this tuple is in the begin/middle of the chain then
1157                                  * we have to move to the end of chain.
1158                                  */
1159                                 while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1160                                 !(ItemPointerEquals(&(tp.t_self), &(tp.t_data->t_ctid))))
1161                                 {
1162                                         Ctid = tp.t_data->t_ctid;
1163                                         if (freeCbuf)
1164                                                 ReleaseBuffer(Cbuf);
1165                                         freeCbuf = true;
1166                                         Cbuf = ReadBuffer(onerel,
1167                                                                           ItemPointerGetBlockNumber(&Ctid));
1168                                         Cpage = BufferGetPage(Cbuf);
1169                                         Citemid = PageGetItemId(Cpage,
1170                                                                           ItemPointerGetOffsetNumber(&Ctid));
1171                                         if (!ItemIdIsUsed(Citemid))
1172                                         {
1173
1174                                                 /*
1175                                                  * This means that in the middle of chain there
1176                                                  * was tuple updated by older (than XmaxRecent)
1177                                                  * xaction and this tuple is already deleted by
1178                                                  * me. Actually, upper part of chain should be
1179                                                  * removed and seems that this should be handled
1180                                                  * in scan_heap(), but it's not implemented at
1181                                                  * the moment and so we just stop shrinking here.
1182                                                  */
1183                                                 ReleaseBuffer(Cbuf);
1184                                                 pfree(vtmove);
1185                                                 vtmove = NULL;
1186                                                 elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1187                                                 break;
1188                                         }
1189                                         tp.t_datamcxt = NULL;
1190                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1191                                         tp.t_self = Ctid;
1192                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1193                                 }
1194                                 if (vtmove == NULL)
1195                                         break;
1196                                 /* first, can chain be moved ? */
1197                                 for (;;)
1198                                 {
1199                                         if (to_vacpage == NULL ||
1200                                                 !enough_space(to_vacpage, tlen))
1201                                         {
1202
1203                                                 /*
1204                                                  * if to_vacpage no longer has enough free space to be
1205                                                  * useful, remove it from fraged_pages list
1206                                                  */
1207                                                 if (to_vacpage != NULL &&
1208                                                  !enough_space(to_vacpage, vacrelstats->min_tlen))
1209                                                 {
1210                                                         Assert(num_fraged_pages > to_item);
1211                                                         memmove(fraged_pages->pagedesc + to_item,
1212                                                                 fraged_pages->pagedesc + to_item + 1,
1213                                                                         sizeof(VacPage) * (num_fraged_pages - to_item - 1));
1214                                                         num_fraged_pages--;
1215                                                 }
1216                                                 for (i = 0; i < num_fraged_pages; i++)
1217                                                 {
1218                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1219                                                                 break;
1220                                                 }
1221
1222                                                 /* can't move item anywhere */
1223                                                 if (i == num_fraged_pages)
1224                                                 {
1225                                                         for (i = 0; i < num_vtmove; i++)
1226                                                         {
1227                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1228                                                                 (vtmove[i].vacpage->offsets_used)--;
1229                                                         }
1230                                                         num_vtmove = 0;
1231                                                         break;
1232                                                 }
1233                                                 to_item = i;
1234                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1235                                         }
1236                                         to_vacpage->free -= MAXALIGN(tlen);
1237                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1238                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1239                                         (to_vacpage->offsets_used)++;
1240                                         if (free_vtmove == 0)
1241                                         {
1242                                                 free_vtmove = 1000;
1243                                                 vtmove = (VTupleMove) repalloc(vtmove,
1244                                                                                          (free_vtmove + num_vtmove) *
1245                                                                                                  sizeof(VTupleMoveData));
1246                                         }
1247                                         vtmove[num_vtmove].tid = tp.t_self;
1248                                         vtmove[num_vtmove].vacpage = to_vacpage;
1249                                         if (to_vacpage->offsets_used == 1)
1250                                                 vtmove[num_vtmove].cleanVpd = true;
1251                                         else
1252                                                 vtmove[num_vtmove].cleanVpd = false;
1253                                         free_vtmove--;
1254                                         num_vtmove++;
1255
1256                                         /* All done ? */
1257                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1258                                                 tp.t_data->t_xmin < XmaxRecent)
1259                                                 break;
1260
1261                                         /* Well, try to find tuple with old row version */
1262                                         for (;;)
1263                                         {
1264                                                 Buffer          Pbuf;
1265                                                 Page            Ppage;
1266                                                 ItemId          Pitemid;
1267                                                 HeapTupleData Ptp;
1268                                                 VTupleLinkData vtld,
1269                                                                    *vtlp;
1270
1271                                                 vtld.new_tid = tp.t_self;
1272                                                 vtlp = (VTupleLink)
1273                                                         vac_find_eq((void *) (vacrelstats->vtlinks),
1274                                                                            vacrelstats->num_vtlinks,
1275                                                                            sizeof(VTupleLinkData),
1276                                                                            (void *) &vtld,
1277                                                                            vac_cmp_vtlinks);
1278                                                 if (vtlp == NULL)
1279                                                         elog(ERROR, "Parent tuple was not found");
1280                                                 tp.t_self = vtlp->this_tid;
1281                                                 Pbuf = ReadBuffer(onerel,
1282                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1283                                                 Ppage = BufferGetPage(Pbuf);
1284                                                 Pitemid = PageGetItemId(Ppage,
1285                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1286                                                 if (!ItemIdIsUsed(Pitemid))
1287                                                         elog(ERROR, "Parent itemid marked as unused");
1288                                                 Ptp.t_datamcxt = NULL;
1289                                                 Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1290                                                 Assert(ItemPointerEquals(&(vtld.new_tid),
1291                                                                                                  &(Ptp.t_data->t_ctid)));
1292
1293                                                 /*
1294                                                  * Read above about cases when
1295                                                  * !ItemIdIsUsed(Citemid) (child item is
1296                                                  * removed)... Due to the fact that at the moment
1297                                                  * we don't remove unuseful part of update-chain,
1298                                                  * it's possible to get too old parent row here.
1299                                                  * Like as in the case which caused this problem,
1300                                                  * we stop shrinking here. I could try to find
1301                                                  * real parent row but want not to do it because
1302                                                  * of real solution will be implemented anyway,
1303                                                  * latter, and we are too close to 6.5 release. -
1304                                                  * vadim 06/11/99
1305                                                  */
1306                                                 if (Ptp.t_data->t_xmax != tp.t_data->t_xmin)
1307                                                 {
1308                                                         if (freeCbuf)
1309                                                                 ReleaseBuffer(Cbuf);
1310                                                         freeCbuf = false;
1311                                                         ReleaseBuffer(Pbuf);
1312                                                         for (i = 0; i < num_vtmove; i++)
1313                                                         {
1314                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1315                                                                 (vtmove[i].vacpage->offsets_used)--;
1316                                                         }
1317                                                         num_vtmove = 0;
1318                                                         elog(NOTICE, "Too old parent tuple found - can't continue repair_frag");
1319                                                         break;
1320                                                 }
1321 #ifdef NOT_USED                                 /* I'm not sure that this will wotk
1322                                                                  * properly... */
1323
1324                                                 /*
1325                                                  * If this tuple is updated version of row and it
1326                                                  * was created by the same transaction then no one
1327                                                  * is interested in this tuple - mark it as
1328                                                  * removed.
1329                                                  */
1330                                                 if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
1331                                                         Ptp.t_data->t_xmin == Ptp.t_data->t_xmax)
1332                                                 {
1333                                                         TransactionIdStore(myXID,
1334                                                                 (TransactionId *) &(Ptp.t_data->t_cmin));
1335                                                         Ptp.t_data->t_infomask &=
1336                                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1337                                                         Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
1338                                                         WriteBuffer(Pbuf);
1339                                                         continue;
1340                                                 }
1341 #endif
1342                                                 tp.t_datamcxt = Ptp.t_datamcxt;
1343                                                 tp.t_data = Ptp.t_data;
1344                                                 tlen = tp.t_len = ItemIdGetLength(Pitemid);
1345                                                 if (freeCbuf)
1346                                                         ReleaseBuffer(Cbuf);
1347                                                 Cbuf = Pbuf;
1348                                                 freeCbuf = true;
1349                                                 break;
1350                                         }
1351                                         if (num_vtmove == 0)
1352                                                 break;
1353                                 }
1354                                 if (freeCbuf)
1355                                         ReleaseBuffer(Cbuf);
1356                                 if (num_vtmove == 0)    /* chain can't be moved */
1357                                 {
1358                                         pfree(vtmove);
1359                                         break;
1360                                 }
1361                                 ItemPointerSetInvalid(&Ctid);
1362                                 for (ti = 0; ti < num_vtmove; ti++)
1363                                 {
1364                                         VacPage destvacpage = vtmove[ti].vacpage;
1365
1366                                         /* Get tuple from chain */
1367                                         tuple.t_self = vtmove[ti].tid;
1368                                         Cbuf = ReadBuffer(onerel,
1369                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1370                                         Cpage = BufferGetPage(Cbuf);
1371                                         Citemid = PageGetItemId(Cpage,
1372                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1373                                         tuple.t_datamcxt = NULL;
1374                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1375                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1376                                         /* Get page to move in */
1377                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1378
1379                                         /*
1380                                          * We should LockBuffer(cur_buffer) but don't, at the
1381                                          * moment. If you'll do LockBuffer then UNLOCK it
1382                                          * before index_insert: unique btree-s call heap_fetch
1383                                          * to get t_infomask of inserted heap tuple !!!
1384                                          */
1385                                         ToPage = BufferGetPage(cur_buffer);
1386
1387                                         /*
1388                                          * If this page was not used before - clean it.
1389                                          *
1390                                          * This path is different from the other callers of
1391                                          * vacuum_page, because we have already incremented the
1392                                          * vacpage's offsets_used field to account for the
1393                                          * tuple(s) we expect to move onto the page. Therefore
1394                                          * vacuum_page's check for offsets_used == 0 is
1395                                          * wrong. But since that's a good debugging check for
1396                                          * all other callers, we work around it here rather
1397                                          * than remove it.
1398                                          */
1399                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1400                                         {
1401                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1402
1403                                                 destvacpage->offsets_used = 0;
1404                                                 vacuum_page(ToPage, destvacpage);
1405                                                 destvacpage->offsets_used = sv_offsets_used;
1406                                         }
1407                                         heap_copytuple_with_tuple(&tuple, &newtup);
1408                                         RelationInvalidateHeapTuple(onerel, &tuple);
1409                                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1410                                         newtup.t_data->t_infomask &=
1411                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1412                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1413                                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1414                                                                                  InvalidOffsetNumber, LP_USED);
1415                                         if (newoff == InvalidOffsetNumber)
1416                                         {
1417                                                 elog(ERROR, "moving chain: failed to add item with len = %u to page %u",
1418                                                          tuple_len, destvacpage->blkno);
1419                                         }
1420                                         newitemid = PageGetItemId(ToPage, newoff);
1421                                         pfree(newtup.t_data);
1422                                         newtup.t_datamcxt = NULL;
1423                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1424                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1425                                         if (((int) destvacpage->blkno) > last_move_dest_block)
1426                                                 last_move_dest_block = destvacpage->blkno;
1427
1428                                         /*
1429                                          * Set t_ctid pointing to itself for last tuple in
1430                                          * chain and to next tuple in chain otherwise.
1431                                          */
1432                                         if (!ItemPointerIsValid(&Ctid))
1433                                                 newtup.t_data->t_ctid = newtup.t_self;
1434                                         else
1435                                                 newtup.t_data->t_ctid = Ctid;
1436                                         Ctid = newtup.t_self;
1437
1438                                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1439                                         tuple.t_data->t_infomask &=
1440                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1441                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1442
1443                                         num_moved++;
1444
1445                                         /*
1446                                          * Remember that we moved tuple from the current page
1447                                          * (corresponding index tuple will be cleaned).
1448                                          */
1449                                         if (Cbuf == buf)
1450                                                 vacpage->offsets[vacpage->offsets_free++] =
1451                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1452                                         else
1453                                                 keep_tuples++;
1454
1455                                         if (Irel != (Relation *) NULL)
1456                                         {
1457                                                 /*
1458                                                  * XXX using CurrentMemoryContext here means
1459                                                  * intra-vacuum memory leak for functional indexes.
1460                                                  * Should fix someday.
1461                                                  *
1462                                                  * XXX This code fails to handle partial indexes!
1463                                                  * Probably should change it to use ExecOpenIndices.
1464                                                  */
1465                                                 for (i = 0; i < nindices; i++)
1466                                                 {
1467                                                         FormIndexDatum(indexInfo[i],
1468                                                                                    &newtup,
1469                                                                                    tupdesc,
1470                                                                                    CurrentMemoryContext,
1471                                                                                    idatum,
1472                                                                                    inulls);
1473                                                         iresult = index_insert(Irel[i],
1474                                                                                                    idatum,
1475                                                                                                    inulls,
1476                                                                                                    &newtup.t_self,
1477                                                                                                    onerel);
1478                                                         if (iresult)
1479                                                                 pfree(iresult);
1480                                                 }
1481                                         }
1482                                         WriteBuffer(cur_buffer);
1483                                         if (Cbuf == buf)
1484                                                 ReleaseBuffer(Cbuf);
1485                                         else
1486                                                 WriteBuffer(Cbuf);
1487                                 }
1488                                 cur_buffer = InvalidBuffer;
1489                                 pfree(vtmove);
1490                                 chain_tuple_moved = true;
1491                                 continue;
1492                         }
1493
1494                         /* try to find new page for this tuple */
1495                         if (cur_buffer == InvalidBuffer ||
1496                                 !enough_space(cur_page, tuple_len))
1497                         {
1498                                 if (cur_buffer != InvalidBuffer)
1499                                 {
1500                                         WriteBuffer(cur_buffer);
1501                                         cur_buffer = InvalidBuffer;
1502
1503                                         /*
1504                                          * If previous target page is now too full to add *any*
1505                                          * tuple to it, remove it from fraged_pages.
1506                                          */
1507                                         if (!enough_space(cur_page, vacrelstats->min_tlen))
1508                                         {
1509                                                 Assert(num_fraged_pages > cur_item);
1510                                                 memmove(fraged_pages->pagedesc + cur_item,
1511                                                                 fraged_pages->pagedesc + cur_item + 1,
1512                                                                 sizeof(VacPage) * (num_fraged_pages - cur_item - 1));
1513                                                 num_fraged_pages--;
1514                                         }
1515                                 }
1516                                 for (i = 0; i < num_fraged_pages; i++)
1517                                 {
1518                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
1519                                                 break;
1520                                 }
1521                                 if (i == num_fraged_pages)
1522                                         break;          /* can't move item anywhere */
1523                                 cur_item = i;
1524                                 cur_page = fraged_pages->pagedesc[cur_item];
1525                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
1526                                 ToPage = BufferGetPage(cur_buffer);
1527                                 /* if this page was not used before - clean it */
1528                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
1529                                         vacuum_page(ToPage, cur_page);
1530                         }
1531
1532                         /* copy tuple */
1533                         heap_copytuple_with_tuple(&tuple, &newtup);
1534
1535                         RelationInvalidateHeapTuple(onerel, &tuple);
1536
1537                         /*
1538                          * Mark new tuple as moved_in by vacuum and store vacuum XID
1539                          * in t_cmin !!!
1540                          */
1541                         TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
1542                         newtup.t_data->t_infomask &=
1543                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1544                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1545
1546                         /* add tuple to the page */
1547                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1548                                                                  InvalidOffsetNumber, LP_USED);
1549                         if (newoff == InvalidOffsetNumber)
1550                         {
1551                                 elog(ERROR, "\
1552 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1553                                          tuple_len, cur_page->blkno, cur_page->free,
1554                                  cur_page->offsets_used, cur_page->offsets_free);
1555                         }
1556                         newitemid = PageGetItemId(ToPage, newoff);
1557                         pfree(newtup.t_data);
1558                         newtup.t_datamcxt = NULL;
1559                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1560                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
1561                         newtup.t_self = newtup.t_data->t_ctid;
1562
1563                         /*
1564                          * Mark old tuple as moved_off by vacuum and store vacuum XID
1565                          * in t_cmin !!!
1566                          */
1567                         TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
1568                         tuple.t_data->t_infomask &=
1569                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1570                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1571
1572                         cur_page->offsets_used++;
1573                         num_moved++;
1574                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1575                         if (((int) cur_page->blkno) > last_move_dest_block)
1576                                 last_move_dest_block = cur_page->blkno;
1577
1578                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1579
1580                         /* insert index' tuples if needed */
1581                         if (Irel != (Relation *) NULL)
1582                         {
1583                                 /*
1584                                  * XXX using CurrentMemoryContext here means
1585                                  * intra-vacuum memory leak for functional indexes.
1586                                  * Should fix someday.
1587                                  *
1588                                  * XXX This code fails to handle partial indexes!
1589                                  * Probably should change it to use ExecOpenIndices.
1590                                  */
1591                                 for (i = 0; i < nindices; i++)
1592                                 {
1593                                         FormIndexDatum(indexInfo[i],
1594                                                                    &newtup,
1595                                                                    tupdesc,
1596                                                                    CurrentMemoryContext,
1597                                                                    idatum,
1598                                                                    inulls);
1599                                         iresult = index_insert(Irel[i],
1600                                                                                    idatum,
1601                                                                                    inulls,
1602                                                                                    &newtup.t_self,
1603                                                                                    onerel);
1604                                         if (iresult)
1605                                                 pfree(iresult);
1606                                 }
1607                         }
1608
1609                 }                                               /* walk along page */
1610
1611                 if (offnum < maxoff && keep_tuples > 0)
1612                 {
1613                         OffsetNumber off;
1614
1615                         for (off = OffsetNumberNext(offnum);
1616                                  off <= maxoff;
1617                                  off = OffsetNumberNext(off))
1618                         {
1619                                 itemid = PageGetItemId(page, off);
1620                                 if (!ItemIdIsUsed(itemid))
1621                                         continue;
1622                                 tuple.t_datamcxt = NULL;
1623                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1624                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
1625                                         continue;
1626                                 if ((TransactionId) tuple.t_data->t_cmin != myXID)
1627                                         elog(ERROR, "Invalid XID in t_cmin (4)");
1628                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1629                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
1630                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1631                                 {
1632                                         /* some chains was moved while */
1633                                         if (chain_tuple_moved)
1634                                         {                       /* cleaning this page */
1635                                                 Assert(vacpage->offsets_free > 0);
1636                                                 for (i = 0; i < vacpage->offsets_free; i++)
1637                                                 {
1638                                                         if (vacpage->offsets[i] == off)
1639                                                                 break;
1640                                                 }
1641                                                 if (i >= vacpage->offsets_free) /* not found */
1642                                                 {
1643                                                         vacpage->offsets[vacpage->offsets_free++] = off;
1644                                                         Assert(keep_tuples > 0);
1645                                                         keep_tuples--;
1646                                                 }
1647                                         }
1648                                         else
1649                                         {
1650                                                 vacpage->offsets[vacpage->offsets_free++] = off;
1651                                                 Assert(keep_tuples > 0);
1652                                                 keep_tuples--;
1653                                         }
1654                                 }
1655                         }
1656                 }
1657
1658                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
1659                 {
1660                         if (chain_tuple_moved)          /* else - they are ordered */
1661                         {
1662                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
1663                                           sizeof(OffsetNumber), vac_cmp_offno);
1664                         }
1665                         reap_page(&Nvacpagelist, vacpage);
1666                         WriteBuffer(buf);
1667                 }
1668                 else if (dowrite)
1669                         WriteBuffer(buf);
1670                 else
1671                         ReleaseBuffer(buf);
1672
1673                 if (offnum <= maxoff)
1674                         break;                          /* some item(s) left */
1675
1676         }                                                       /* walk along relation */
1677
1678         blkno++;                                        /* new number of blocks */
1679
1680         if (cur_buffer != InvalidBuffer)
1681         {
1682                 Assert(num_moved > 0);
1683                 WriteBuffer(cur_buffer);
1684         }
1685
1686         if (num_moved > 0)
1687         {
1688
1689                 /*
1690                  * We have to commit our tuple' movings before we'll truncate
1691                  * relation, but we shouldn't lose our locks. And so - quick hack:
1692                  * flush buffers and record status of current transaction as
1693                  * committed, and continue. - vadim 11/13/96
1694                  */
1695                 FlushBufferPool();
1696                 TransactionIdCommit(myXID);
1697                 FlushBufferPool();
1698         }
1699
1700         /*
1701          * Clean uncleaned reaped pages from vacuum_pages list list and set
1702          * xmin committed for inserted tuples
1703          */
1704         checked_moved = 0;
1705         for (i = 0, curpage = vacuum_pages->pagedesc; i < vacuumed_pages; i++, curpage++)
1706         {
1707                 Assert((*curpage)->blkno < (BlockNumber) blkno);
1708                 buf = ReadBuffer(onerel, (*curpage)->blkno);
1709                 page = BufferGetPage(buf);
1710                 if ((*curpage)->offsets_used == 0)              /* this page was not used */
1711                 {
1712                         if (!PageIsEmpty(page))
1713                                 vacuum_page(page, *curpage);
1714                 }
1715                 else
1716 /* this page was used */
1717                 {
1718                         num_tuples = 0;
1719                         max_offset = PageGetMaxOffsetNumber(page);
1720                         for (newoff = FirstOffsetNumber;
1721                                  newoff <= max_offset;
1722                                  newoff = OffsetNumberNext(newoff))
1723                         {
1724                                 itemid = PageGetItemId(page, newoff);
1725                                 if (!ItemIdIsUsed(itemid))
1726                                         continue;
1727                                 tuple.t_datamcxt = NULL;
1728                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1729                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1730                                 {
1731                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
1732                                                 elog(ERROR, "Invalid XID in t_cmin (2)");
1733                                         if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1734                                         {
1735                                                 tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
1736                                                 num_tuples++;
1737                                         }
1738                                         else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1739                                                 tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
1740                                         else
1741                                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
1742                                 }
1743                         }
1744                         Assert((*curpage)->offsets_used == num_tuples);
1745                         checked_moved += num_tuples;
1746                 }
1747                 WriteBuffer(buf);
1748         }
1749         Assert(num_moved == checked_moved);
1750
1751         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. %s",
1752                  RelationGetRelationName(onerel),
1753                  nblocks, blkno, num_moved,
1754                  show_rusage(&ru0));
1755
1756         if (Nvacpagelist.num_pages > 0)
1757         {
1758                 /* vacuum indices again if needed */
1759                 if (Irel != (Relation *) NULL)
1760                 {
1761                         VacPage    *vpleft,
1762                                            *vpright,
1763                                                 vpsave;
1764
1765                         /* re-sort Nvacpagelist.pagedesc */
1766                         for (vpleft = Nvacpagelist.pagedesc,
1767                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
1768                                  vpleft < vpright; vpleft++, vpright--)
1769                         {
1770                                 vpsave = *vpleft;
1771                                 *vpleft = *vpright;
1772                                 *vpright = vpsave;
1773                         }
1774                         Assert(keep_tuples >= 0);
1775                         for (i = 0; i < nindices; i++)
1776                                 vacuum_index(&Nvacpagelist, Irel[i],
1777                                                          vacrelstats->num_tuples, keep_tuples);
1778                 }
1779
1780                 /* clean moved tuples from last page in Nvacpagelist list */
1781                 if (vacpage->blkno == (BlockNumber) (blkno - 1) &&
1782                         vacpage->offsets_free > 0)
1783                 {
1784                         buf = ReadBuffer(onerel, vacpage->blkno);
1785                         page = BufferGetPage(buf);
1786                         num_tuples = 0;
1787                         for (offnum = FirstOffsetNumber;
1788                                  offnum <= maxoff;
1789                                  offnum = OffsetNumberNext(offnum))
1790                         {
1791                                 itemid = PageGetItemId(page, offnum);
1792                                 if (!ItemIdIsUsed(itemid))
1793                                         continue;
1794                                 tuple.t_datamcxt = NULL;
1795                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1796
1797                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1798                                 {
1799                                         if ((TransactionId) tuple.t_data->t_cmin != myXID)
1800                                                 elog(ERROR, "Invalid XID in t_cmin (3)");
1801                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1802                                         {
1803                                                 itemid->lp_flags &= ~LP_USED;
1804                                                 num_tuples++;
1805                                         }
1806                                         else
1807                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
1808                                 }
1809
1810                         }
1811                         Assert(vacpage->offsets_free == num_tuples);
1812                         PageRepairFragmentation(page);
1813                         WriteBuffer(buf);
1814                 }
1815
1816                 /* now - free new list of reaped pages */
1817                 curpage = Nvacpagelist.pagedesc;
1818                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
1819                         pfree(*curpage);
1820                 pfree(Nvacpagelist.pagedesc);
1821         }
1822
1823         /* truncate relation, after flushing any dirty pages out to disk */
1824         if (blkno < nblocks)
1825         {
1826                 i = FlushRelationBuffers(onerel, blkno);
1827                 if (i < 0)
1828                         elog(FATAL, "VACUUM (repair_frag): FlushRelationBuffers returned %d", i);
1829                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
1830                 Assert(blkno >= 0);
1831                 vacrelstats->num_pages = blkno; /* set new number of blocks */
1832         }
1833
1834         if (Irel != (Relation *) NULL)          /* pfree index' allocations */
1835         {
1836                 close_indices(nindices, Irel);
1837                 pfree(indexInfo);
1838         }
1839
1840         pfree(vacpage);
1841         if (vacrelstats->vtlinks != NULL)
1842                 pfree(vacrelstats->vtlinks);
1843
1844 }
1845
1846 /*
1847  *      vacuum_heap() -- free dead tuples
1848  *
1849  *              This routine marks dead tuples as unused and truncates relation
1850  *              if there are "empty" end-blocks.
1851  */
1852 static void
1853 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
1854 {
1855         Buffer          buf;
1856         Page            page;
1857         VacPage    *vacpage;
1858         int                     nblocks;
1859         int                     i;
1860
1861         nblocks = vacuum_pages->num_pages;
1862         nblocks -= vacuum_pages->empty_end_pages;               /* nothing to do with
1863                                                                                                                  * them */
1864
1865         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
1866         {
1867                 if ((*vacpage)->offsets_free > 0)
1868                 {
1869                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
1870                         page = BufferGetPage(buf);
1871                         vacuum_page(page, *vacpage);
1872                         WriteBuffer(buf);
1873                 }
1874         }
1875
1876         /* truncate relation if there are some empty end-pages */
1877         if (vacuum_pages->empty_end_pages > 0)
1878         {
1879                 Assert(vacrelstats->num_pages >= vacuum_pages->empty_end_pages);
1880                 nblocks = vacrelstats->num_pages - vacuum_pages->empty_end_pages;
1881                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
1882                          RelationGetRelationName(onerel),
1883                          vacrelstats->num_pages, nblocks);
1884
1885                 /*
1886                  * We have to flush "empty" end-pages (if changed, but who knows
1887                  * it) before truncation
1888                  *
1889                  * XXX is FlushBufferPool() still needed here?
1890                  */
1891                 FlushBufferPool();
1892
1893                 i = FlushRelationBuffers(onerel, nblocks);
1894                 if (i < 0)
1895                         elog(FATAL, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d", i);
1896
1897                 nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks);
1898                 Assert(nblocks >= 0);
1899                 vacrelstats->num_pages = nblocks;               /* set new number of
1900                                                                                                  * blocks */
1901         }
1902
1903 }
1904
1905 /*
1906  *      vacuum_page() -- free dead tuples on a page
1907  *                                       and repair its fragmentation.
1908  */
1909 static void
1910 vacuum_page(Page page, VacPage vacpage)
1911 {
1912         ItemId          itemid;
1913         int                     i;
1914
1915         /* There shouldn't be any tuples moved onto the page yet! */
1916         Assert(vacpage->offsets_used == 0);
1917
1918         for (i = 0; i < vacpage->offsets_free; i++)
1919         {
1920                 itemid = &(((PageHeader) page)->pd_linp[vacpage->offsets[i] - 1]);
1921                 itemid->lp_flags &= ~LP_USED;
1922         }
1923         PageRepairFragmentation(page);
1924
1925 }
1926
1927 /*
1928  *      _scan_index() -- scan one index relation to update statistic.
1929  *
1930  */
1931 static void
1932 scan_index(Relation indrel, int num_tuples)
1933 {
1934         RetrieveIndexResult res;
1935         IndexScanDesc iscan;
1936         int                     nitups;
1937         int                     nipages;
1938         struct rusage ru0;
1939
1940         getrusage(RUSAGE_SELF, &ru0);
1941
1942         /* walk through the entire index */
1943         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1944         nitups = 0;
1945
1946         while ((res = index_getnext(iscan, ForwardScanDirection))
1947                    != (RetrieveIndexResult) NULL)
1948         {
1949                 nitups++;
1950                 pfree(res);
1951         }
1952
1953         index_endscan(iscan);
1954
1955         /* now update statistics in pg_class */
1956         nipages = RelationGetNumberOfBlocks(indrel);
1957         update_relstats(RelationGetRelid(indrel), nipages, nitups, false, NULL);
1958
1959         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %u. %s",
1960                  RelationGetRelationName(indrel), nipages, nitups,
1961                  show_rusage(&ru0));
1962
1963         if (nitups != num_tuples)
1964                 elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u).\
1965 \n\tRecreate the index.",
1966                          RelationGetRelationName(indrel), nitups, num_tuples);
1967
1968 }
1969
1970 /*
1971  *      vacuum_index() -- vacuum one index relation.
1972  *
1973  *              Vpl is the VacPageList of the heap we're currently vacuuming.
1974  *              It's locked. Indrel is an index relation on the vacuumed heap.
1975  *              We don't set locks on the index relation here, since the indexed
1976  *              access methods support locking at different granularities.
1977  *              We let them handle it.
1978  *
1979  *              Finally, we arrange to update the index relation's statistics in
1980  *              pg_class.
1981  */
1982 static void
1983 vacuum_index(VacPageList vacpagelist, Relation indrel, int num_tuples, int keep_tuples)
1984 {
1985         RetrieveIndexResult res;
1986         IndexScanDesc iscan;
1987         ItemPointer heapptr;
1988         int                     tups_vacuumed;
1989         int                     num_index_tuples;
1990         int                     num_pages;
1991         VacPage         vp;
1992         struct rusage ru0;
1993
1994         getrusage(RUSAGE_SELF, &ru0);
1995
1996         /* walk through the entire index */
1997         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1998         tups_vacuumed = 0;
1999         num_index_tuples = 0;
2000
2001         while ((res = index_getnext(iscan, ForwardScanDirection))
2002                    != (RetrieveIndexResult) NULL)
2003         {
2004                 heapptr = &res->heap_iptr;
2005
2006                 if ((vp = tid_reaped(heapptr, vacpagelist)) != (VacPage) NULL)
2007                 {
2008 #ifdef NOT_USED
2009                         elog(DEBUG, "<%x,%x> -> <%x,%x>",
2010                                  ItemPointerGetBlockNumber(&(res->index_iptr)),
2011                                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
2012                                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
2013                                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
2014 #endif
2015                         if (vp->offsets_free == 0)
2016                         {
2017                                 elog(NOTICE, "Index %s: pointer to EmptyPage (blk %u off %u) - fixing",
2018                                          RelationGetRelationName(indrel),
2019                                          vp->blkno, ItemPointerGetOffsetNumber(heapptr));
2020                         }
2021                         ++tups_vacuumed;
2022                         index_delete(indrel, &res->index_iptr);
2023                 }
2024                 else
2025                         num_index_tuples++;
2026
2027                 pfree(res);
2028         }
2029
2030         index_endscan(iscan);
2031
2032         /* now update statistics in pg_class */
2033         num_pages = RelationGetNumberOfBlocks(indrel);
2034         update_relstats(RelationGetRelid(indrel), num_pages, num_index_tuples, false, NULL);
2035
2036         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %u: Deleted %u. %s",
2037                  RelationGetRelationName(indrel), num_pages,
2038                  num_index_tuples - keep_tuples, tups_vacuumed,
2039                  show_rusage(&ru0));
2040
2041         if (num_index_tuples != num_tuples + keep_tuples)
2042                 elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u).\
2043 \n\tRecreate the index.",
2044                   RelationGetRelationName(indrel), num_index_tuples, num_tuples);
2045
2046 }
2047
2048 /*
2049  *      tid_reaped() -- is a particular tid reaped?
2050  *
2051  *              vacpagelist->VacPage_array is sorted in right order.
2052  */
2053 static VacPage
2054 tid_reaped(ItemPointer itemptr, VacPageList vacpagelist)
2055 {
2056         OffsetNumber ioffno;
2057         OffsetNumber *voff;
2058         VacPage         vp,
2059                            *vpp;
2060         VacPageData vacpage;
2061
2062         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2063         ioffno = ItemPointerGetOffsetNumber(itemptr);
2064
2065         vp = &vacpage;
2066         vpp = (VacPage *) vac_find_eq((void *) (vacpagelist->pagedesc),
2067                                         vacpagelist->num_pages, sizeof(VacPage), (void *) &vp,
2068                                                                         vac_cmp_blk);
2069
2070         if (vpp == (VacPage *) NULL)
2071                 return (VacPage) NULL;
2072         vp = *vpp;
2073
2074         /* ok - we are on true page */
2075
2076         if (vp->offsets_free == 0)
2077         {                                                       /* this is EmptyPage !!! */
2078                 return vp;
2079         }
2080
2081         voff = (OffsetNumber *) vac_find_eq((void *) (vp->offsets),
2082                         vp->offsets_free, sizeof(OffsetNumber), (void *) &ioffno,
2083                                                                            vac_cmp_offno);
2084
2085         if (voff == (OffsetNumber *) NULL)
2086                 return (VacPage) NULL;
2087
2088         return vp;
2089
2090 }
2091
2092 /*
2093  *      update_relstats() -- update statistics for one relation
2094  *
2095  *              Statistics are stored in several places: the pg_class row for the
2096  *              relation has stats about the whole relation, the pg_attribute rows
2097  *              for each attribute store "disbursion", and there is a pg_statistic
2098  *              row for each (non-system) attribute.  (Disbursion probably ought to
2099  *              be moved to pg_statistic, but it's not worth doing unless there's
2100  *              another reason to have to change pg_attribute.)  Disbursion and
2101  *              pg_statistic values are only updated by VACUUM ANALYZE, but we
2102  *              always update the stats in pg_class.
2103  *
2104  *              This routine works for both index and heap relation entries in
2105  *              pg_class.  We violate no-overwrite semantics here by storing new
2106  *              values for the statistics columns directly into the pg_class
2107  *              tuple that's already on the page.  The reason for this is that if
2108  *              we updated these tuples in the usual way, vacuuming pg_class itself
2109  *              wouldn't work very well --- by the time we got done with a vacuum
2110  *              cycle, most of the tuples in pg_class would've been obsoleted.
2111  *              Updating pg_class's own statistics would be especially tricky.
2112  *              Of course, this only works for fixed-size never-null columns, but
2113  *              these are.
2114  */
2115 static void
2116 update_relstats(Oid relid, int num_pages, int num_tuples, bool hasindex,
2117                         VRelStats *vacrelstats)
2118 {
2119         Relation        rd;
2120         HeapTupleData rtup;
2121         HeapTuple       ctup;
2122         Form_pg_class pgcform;
2123         Buffer          buffer;
2124
2125         /*
2126          * update number of tuples and number of pages in pg_class
2127          */
2128         rd = heap_openr(RelationRelationName, RowExclusiveLock);
2129
2130         ctup = SearchSysCacheTupleCopy(RELOID,
2131                                                                    ObjectIdGetDatum(relid),
2132                                                                    0, 0, 0);
2133         if (!HeapTupleIsValid(ctup))
2134                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
2135                          relid);
2136
2137         /* get the buffer cache tuple */
2138         rtup.t_self = ctup->t_self;
2139         heap_fetch(rd, SnapshotNow, &rtup, &buffer);
2140         heap_freetuple(ctup);
2141
2142         /* overwrite the existing statistics in the tuple */
2143         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
2144         pgcform->reltuples = num_tuples;
2145         pgcform->relpages = num_pages;
2146         pgcform->relhasindex = hasindex;
2147
2148         /* invalidate the tuple in the cache and write the buffer */
2149         RelationInvalidateHeapTuple(rd, &rtup);
2150         WriteBuffer(buffer);
2151
2152         heap_close(rd, RowExclusiveLock);
2153 }
2154
2155 /*
2156  *      reap_page() -- save a page on the array of reaped pages.
2157  *
2158  *              As a side effect of the way that the vacuuming loop for a given
2159  *              relation works, higher pages come after lower pages in the array
2160  *              (and highest tid on a page is last).
2161  */
2162 static void
2163 reap_page(VacPageList vacpagelist, VacPage vacpage)
2164 {
2165         VacPage newvacpage;
2166
2167         /* allocate a VacPageData entry */
2168         newvacpage = (VacPage) palloc(sizeof(VacPageData) + vacpage->offsets_free * sizeof(OffsetNumber));
2169
2170         /* fill it in */
2171         if (vacpage->offsets_free > 0)
2172                 memmove(newvacpage->offsets, vacpage->offsets, vacpage->offsets_free * sizeof(OffsetNumber));
2173         newvacpage->blkno = vacpage->blkno;
2174         newvacpage->free = vacpage->free;
2175         newvacpage->offsets_used = vacpage->offsets_used;
2176         newvacpage->offsets_free = vacpage->offsets_free;
2177
2178         /* insert this page into vacpagelist list */
2179         vpage_insert(vacpagelist, newvacpage);
2180
2181 }
2182
2183 static void
2184 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2185 {
2186 #define PG_NPAGEDESC 1024
2187
2188         /* allocate a VacPage entry if needed */
2189         if (vacpagelist->num_pages == 0)
2190         {
2191                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2192                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2193         }
2194         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2195         {
2196                 vacpagelist->num_allocated_pages *= 2;
2197                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2198         }
2199         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2200         (vacpagelist->num_pages)++;
2201
2202 }
2203
2204 static void *
2205 vac_find_eq(void *bot, int nelem, int size, void *elm,
2206                    int (*compar) (const void *, const void *))
2207 {
2208         int                     res;
2209         int                     last = nelem - 1;
2210         int                     celm = nelem / 2;
2211         bool            last_move,
2212                                 first_move;
2213
2214         last_move = first_move = true;
2215         for (;;)
2216         {
2217                 if (first_move == true)
2218                 {
2219                         res = compar(bot, elm);
2220                         if (res > 0)
2221                                 return NULL;
2222                         if (res == 0)
2223                                 return bot;
2224                         first_move = false;
2225                 }
2226                 if (last_move == true)
2227                 {
2228                         res = compar(elm, (void *) ((char *) bot + last * size));
2229                         if (res > 0)
2230                                 return NULL;
2231                         if (res == 0)
2232                                 return (void *) ((char *) bot + last * size);
2233                         last_move = false;
2234                 }
2235                 res = compar(elm, (void *) ((char *) bot + celm * size));
2236                 if (res == 0)
2237                         return (void *) ((char *) bot + celm * size);
2238                 if (res < 0)
2239                 {
2240                         if (celm == 0)
2241                                 return NULL;
2242                         last = celm - 1;
2243                         celm = celm / 2;
2244                         last_move = true;
2245                         continue;
2246                 }
2247
2248                 if (celm == last)
2249                         return NULL;
2250
2251                 last = last - celm - 1;
2252                 bot = (void *) ((char *) bot + (celm + 1) * size);
2253                 celm = (last + 1) / 2;
2254                 first_move = true;
2255         }
2256
2257 }
2258
2259 static int
2260 vac_cmp_blk(const void *left, const void *right)
2261 {
2262         BlockNumber lblk,
2263                                 rblk;
2264
2265         lblk = (*((VacPage *) left))->blkno;
2266         rblk = (*((VacPage *) right))->blkno;
2267
2268         if (lblk < rblk)
2269                 return -1;
2270         if (lblk == rblk)
2271                 return 0;
2272         return 1;
2273
2274 }
2275
2276 static int
2277 vac_cmp_offno(const void *left, const void *right)
2278 {
2279
2280         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2281                 return -1;
2282         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2283                 return 0;
2284         return 1;
2285
2286 }
2287
2288 static int
2289 vac_cmp_vtlinks(const void *left, const void *right)
2290 {
2291
2292         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2293                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2294                 return -1;
2295         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2296                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2297                 return 1;
2298         /* bi_hi-es are equal */
2299         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2300                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2301                 return -1;
2302         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2303                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2304                 return 1;
2305         /* bi_lo-es are equal */
2306         if (((VTupleLink) left)->new_tid.ip_posid <
2307                 ((VTupleLink) right)->new_tid.ip_posid)
2308                 return -1;
2309         if (((VTupleLink) left)->new_tid.ip_posid >
2310                 ((VTupleLink) right)->new_tid.ip_posid)
2311                 return 1;
2312         return 0;
2313
2314 }
2315
2316
2317 static void
2318 get_indices(Relation relation, int *nindices, Relation **Irel)
2319 {
2320         List       *indexoidlist,
2321                            *indexoidscan;
2322         int                     i;
2323
2324         indexoidlist = RelationGetIndexList(relation);
2325
2326         *nindices = length(indexoidlist);
2327
2328         if (*nindices > 0)
2329                 *Irel = (Relation *) palloc(*nindices * sizeof(Relation));
2330         else
2331                 *Irel = NULL;
2332
2333         i = 0;
2334         foreach(indexoidscan, indexoidlist)
2335         {
2336                 Oid                     indexoid = lfirsti(indexoidscan);
2337
2338                 (*Irel)[i] = index_open(indexoid);
2339                 i++;
2340         }
2341
2342         freeList(indexoidlist);
2343 }
2344
2345
2346 static void
2347 close_indices(int nindices, Relation *Irel)
2348 {
2349
2350         if (Irel == (Relation *) NULL)
2351                 return;
2352
2353         while (nindices--)
2354                 index_close(Irel[nindices]);
2355         pfree(Irel);
2356
2357 }
2358
2359
2360 /*
2361  * Obtain IndexInfo data for each index on the rel
2362  */
2363 static IndexInfo **
2364 get_index_desc(Relation onerel, int nindices, Relation *Irel)
2365 {
2366         IndexInfo **indexInfo;
2367         int                     i;
2368         HeapTuple       cachetuple;
2369
2370         indexInfo = (IndexInfo **) palloc(nindices * sizeof(IndexInfo *));
2371
2372         for (i = 0; i < nindices; i++)
2373         {
2374                 cachetuple = SearchSysCacheTuple(INDEXRELID,
2375                                                          ObjectIdGetDatum(RelationGetRelid(Irel[i])),
2376                                                                                  0, 0, 0);
2377                 if (!HeapTupleIsValid(cachetuple))
2378                         elog(ERROR, "get_index_desc: index %u not found",
2379                                  RelationGetRelid(Irel[i]));
2380                 indexInfo[i] = BuildIndexInfo(cachetuple);
2381         }
2382
2383         return indexInfo;
2384 }
2385
2386
2387 static bool
2388 enough_space(VacPage vacpage, Size len)
2389 {
2390
2391         len = MAXALIGN(len);
2392
2393         if (len > vacpage->free)
2394                 return false;
2395
2396         if (vacpage->offsets_used < vacpage->offsets_free)      /* there are free
2397                                                                                                                  * itemid(s) */
2398                 return true;                    /* and len <= free_space */
2399
2400         /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2401         if (len + MAXALIGN(sizeof(ItemIdData)) <= vacpage->free)
2402                 return true;
2403
2404         return false;
2405
2406 }
2407
2408
2409 /*
2410  * Compute elapsed time since ru0 usage snapshot, and format into
2411  * a displayable string.  Result is in a static string, which is
2412  * tacky, but no one ever claimed that the Postgres backend is
2413  * threadable...
2414  */
2415 static char *
2416 show_rusage(struct rusage * ru0)
2417 {
2418         static char result[64];
2419         struct rusage ru1;
2420
2421         getrusage(RUSAGE_SELF, &ru1);
2422
2423         if (ru1.ru_stime.tv_usec < ru0->ru_stime.tv_usec)
2424         {
2425                 ru1.ru_stime.tv_sec--;
2426                 ru1.ru_stime.tv_usec += 1000000;
2427         }
2428         if (ru1.ru_utime.tv_usec < ru0->ru_utime.tv_usec)
2429         {
2430                 ru1.ru_utime.tv_sec--;
2431                 ru1.ru_utime.tv_usec += 1000000;
2432         }
2433
2434         snprintf(result, sizeof(result),
2435                          "CPU %d.%02ds/%d.%02du sec.",
2436                          (int) (ru1.ru_stime.tv_sec - ru0->ru_stime.tv_sec),
2437                          (int) (ru1.ru_stime.tv_usec - ru0->ru_stime.tv_usec) / 10000,
2438                          (int) (ru1.ru_utime.tv_sec - ru0->ru_utime.tv_sec),
2439                    (int) (ru1.ru_utime.tv_usec - ru0->ru_utime.tv_usec) / 10000);
2440
2441         return result;
2442 }