*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.199 2001/06/29 16:34:30 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.200 2001/06/29 20:14:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
Size free; /* FreeSpace on this Page */
uint16 offsets_used; /* Number of OffNums used by vacuum */
uint16 offsets_free; /* Number of OffNums free or to be free */
- OffsetNumber offsets[1]; /* Array of its OffNums */
+ OffsetNumber offsets[1]; /* Array of free OffNums */
} VacPageData;
typedef VacPageData *VacPage;
typedef struct VacPageListData
{
- int empty_end_pages;/* Number of "empty" end-pages */
- int num_pages; /* Number of pages in pagedesc */
+ BlockNumber empty_end_pages; /* Number of "empty" end-pages */
+ int num_pages; /* Number of pages in pagedesc */
int num_allocated_pages; /* Number of allocated pages in
* pagedesc */
- VacPage *pagedesc; /* Descriptions of pages */
+ VacPage *pagedesc; /* Descriptions of pages */
} VacPageListData;
typedef VacPageListData *VacPageList;
double num_tuples, int keep_tuples);
static void scan_index(Relation indrel, double num_tuples);
static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist);
-static void reap_page(VacPageList vacpagelist, VacPage vacpage);
+static VacPage copy_vac_page(VacPage vacpage);
static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
static void get_indices(Relation relation, int *nindices, Relation **Irel);
static void close_indices(int nindices, Relation *Irel);
/*
* scan_heap() -- scan an open heap relation
*
- * This routine sets commit times, constructs vacuum_pages list of
- * empty/uninitialized pages and pages with dead tuples and
- * ~LP_USED line pointers, constructs fraged_pages list of pages
- * appropriate for purposes of shrinking and maintains statistics
+ * This routine sets commit status bits, constructs vacuum_pages (list
+ * of pages we need to compact free space on and/or clean indexes of
+ * deleted tuples), constructs fraged_pages (list of pages with free
+ * space that tuples could be moved into), and calculates statistics
* on the number of live tuples in a heap.
*/
static void
ItemId itemid;
Buffer buf;
HeapTupleData tuple;
- Page page,
- tempPage = NULL;
OffsetNumber offnum,
maxoff;
bool pgchanged,
notup;
char *relname;
VacPage vacpage,
- vp;
- double num_tuples;
- uint32 tups_vacuumed,
- nkeep,
- nunused,
- ncrash,
- empty_pages,
+ vacpagecopy;
+ BlockNumber empty_pages,
new_pages,
changed_pages,
empty_end_pages;
- Size free_size,
+ double num_tuples,
+ tups_vacuumed,
+ nkeep,
+ nunused,
+ ncrash;
+ double free_size,
usable_free_size;
Size min_tlen = MaxTupleSize;
Size max_tlen = 0;
- int32 i;
+ int i;
bool do_shrinking = true;
VTupleLink vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
int num_vtlinks = 0;
relname = RelationGetRelationName(onerel);
elog(MESSAGE_LEVEL, "--Relation %s--", relname);
- tups_vacuumed = nkeep = nunused = ncrash = empty_pages =
- new_pages = changed_pages = empty_end_pages = 0;
- num_tuples = 0;
- free_size = usable_free_size = 0;
+ empty_pages = new_pages = changed_pages = empty_end_pages = 0;
+ num_tuples = tups_vacuumed = nkeep = nunused = ncrash = 0;
+ free_size = 0;
nblocks = RelationGetNumberOfBlocks(onerel);
+ /*
+ * We initially create each VacPage item in a maximal-sized workspace,
+ * then copy the workspace into a just-large-enough copy.
+ */
vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
- vacpage->offsets_used = 0;
for (blkno = 0; blkno < nblocks; blkno++)
{
+ Page page,
+ tempPage = NULL;
+ bool do_reap,
+ do_frag;
+
buf = ReadBuffer(onerel, blkno);
page = BufferGetPage(buf);
+
vacpage->blkno = blkno;
+ vacpage->offsets_used = 0;
vacpage->offsets_free = 0;
if (PageIsNew(page))
free_size += (vacpage->free - sizeof(ItemIdData));
new_pages++;
empty_end_pages++;
- reap_page(vacuum_pages, vacpage);
+ vacpagecopy = copy_vac_page(vacpage);
+ vpage_insert(vacuum_pages, vacpagecopy);
+ vpage_insert(fraged_pages, vacpagecopy);
WriteBuffer(buf);
continue;
}
free_size += (vacpage->free - sizeof(ItemIdData));
empty_pages++;
empty_end_pages++;
- reap_page(vacuum_pages, vacpage);
+ vacpagecopy = copy_vac_page(vacpage);
+ vpage_insert(vacuum_pages, vacpagecopy);
+ vpage_insert(fraged_pages, vacpagecopy);
ReleaseBuffer(buf);
continue;
}
if (!ItemIdIsUsed(itemid))
{
vacpage->offsets[vacpage->offsets_free++] = offnum;
- nunused++;
+ nunused += 1;
continue;
}
* Not Aborted, Not Committed, Not in Progress -
* so it's from crashed process. - vadim 11/26/96
*/
- ncrash++;
+ ncrash += 1;
tupgone = true;
}
else
tuple.t_data->t_xmax >= XmaxRecent)
{
tupgone = false;
- nkeep++;
+ nkeep += 1;
if (!(tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED))
{
tuple.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
pageSize = PageGetPageSize(page);
tempPage = (Page) palloc(pageSize);
- memmove(tempPage, page, pageSize);
+ memcpy(tempPage, page, pageSize);
}
/* mark it unused on the temp page */
lpp->lp_flags &= ~LP_USED;
vacpage->offsets[vacpage->offsets_free++] = offnum;
- tups_vacuumed++;
+ tups_vacuumed += 1;
}
else
{
}
if (tempPage != (Page) NULL)
- { /* Some tuples are gone */
+ {
+ /* Some tuples are removable; figure free space after removal */
PageRepairFragmentation(tempPage, NULL);
vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
- free_size += vacpage->free;
- reap_page(vacuum_pages, vacpage);
pfree(tempPage);
- tempPage = (Page) NULL;
+ do_reap = true;
}
- else if (vacpage->offsets_free > 0)
- { /* there are only ~LP_USED line pointers */
+ else
+ {
+ /* Just use current available space */
vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
- free_size += vacpage->free;
- reap_page(vacuum_pages, vacpage);
+ /* Need to reap the page if it has ~LP_USED line pointers */
+ do_reap = (vacpage->offsets_free > 0);
}
- if (pgchanged)
+ free_size += vacpage->free;
+ /*
+ * Add the page to fraged_pages if it has a useful amount of free
+ * space. "Useful" means enough for a minimal-sized tuple.
+ * But we don't know that accurately near the start of the relation,
+ * so add pages unconditionally if they have >= BLCKSZ/10 free space.
+ */
+ do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ/10);
+
+ if (do_reap || do_frag)
{
- WriteBuffer(buf);
- changed_pages++;
+ vacpagecopy = copy_vac_page(vacpage);
+ if (do_reap)
+ vpage_insert(vacuum_pages, vacpagecopy);
+ if (do_frag)
+ vpage_insert(fraged_pages, vacpagecopy);
}
- else
- ReleaseBuffer(buf);
if (notup)
empty_end_pages++;
else
empty_end_pages = 0;
+
+ if (pgchanged)
+ {
+ WriteBuffer(buf);
+ changed_pages++;
+ }
+ else
+ ReleaseBuffer(buf);
}
pfree(vacpage);
fraged_pages->empty_end_pages = empty_end_pages;
/*
- * Try to make fraged_pages keeping in mind that we can't use free
- * space of "empty" end-pages and last page if it reaped.
+ * Clear the fraged_pages list if we found we couldn't shrink.
+ * Else, remove any "empty" end-pages from the list, and compute
+ * usable free space = free space in remaining pages.
*/
- if (do_shrinking && vacuum_pages->num_pages - empty_end_pages > 0)
+ if (do_shrinking)
{
- int nusf; /* blocks usefull for re-using */
-
- nusf = vacuum_pages->num_pages - empty_end_pages;
- if ((vacuum_pages->pagedesc[nusf - 1])->blkno == nblocks - empty_end_pages - 1)
- nusf--;
-
- for (i = 0; i < nusf; i++)
- {
- vp = vacuum_pages->pagedesc[i];
- if (enough_space(vp, min_tlen))
- {
- vpage_insert(fraged_pages, vp);
- usable_free_size += vp->free;
- }
- }
+ Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
+ fraged_pages->num_pages -= empty_end_pages;
+ usable_free_size = 0;
+ for (i = 0; i < fraged_pages->num_pages; i++)
+ usable_free_size += fraged_pages->pagedesc[i]->free;
+ }
+ else
+ {
+ fraged_pages->num_pages = 0;
+ usable_free_size = 0;
}
if (usable_free_size > 0 && num_vtlinks > 0)
}
elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
-Tup %.0f: Vac %u, Keep/VTL %u/%u, Crash %u, UnUsed %u, MinLen %lu, MaxLen %lu; \
-Re-using: Free/Avail. Space %lu/%lu; EndEmpty/Avail. Pages %u/%u. %s",
+Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, Crash %.0f, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
+Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u. %s",
nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
new_pages, num_tuples, tups_vacuumed,
nkeep, vacrelstats->num_vtlinks, ncrash,
nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
- (unsigned long) free_size, (unsigned long) usable_free_size,
+ free_size, usable_free_size,
empty_end_pages, fraged_pages->num_pages,
show_rusage(&ru0));
CommandId myCID;
Buffer buf,
cur_buffer;
- int nblocks,
+ BlockNumber nblocks,
blkno;
BlockNumber last_move_dest_block = 0,
last_vacuum_block;
Nvacpagelist.num_pages = 0;
num_fraged_pages = fraged_pages->num_pages;
- Assert(vacuum_pages->num_pages > vacuum_pages->empty_end_pages);
+ Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
- last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
- last_vacuum_block = last_vacuum_page->blkno;
+ if (vacuumed_pages > 0)
+ {
+ /* get last reaped page from vacuum_pages */
+ last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
+ last_vacuum_block = last_vacuum_page->blkno;
+ }
+ else
+ {
+ last_vacuum_page = NULL;
+ last_vacuum_block = InvalidBlockNumber;
+ }
cur_buffer = InvalidBuffer;
num_moved = 0;
/*
* Scan pages backwards from the last nonempty page, trying to move
* tuples down to lower pages. Quit when we reach a page that we have
- * moved any tuples onto. Note that if a page is still in the
- * fraged_pages list (list of candidate move-target pages) when we
- * reach it, we will remove it from the list. This ensures we never
- * move a tuple up to a higher page number.
+ * moved any tuples onto, or the first page if we haven't moved anything,
+ * or when we find a page we cannot completely empty (this last condition
+ * is handled by "break" statements within the loop).
*
* NB: this code depends on the vacuum_pages and fraged_pages lists being
- * in order, and on fraged_pages being a subset of vacuum_pages.
+ * in order by blkno.
*/
nblocks = vacrelstats->rel_pages;
for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
blkno > last_move_dest_block;
blkno--)
{
+ /*
+ * Forget fraged_pages pages at or after this one; they're no longer
+ * useful as move targets, since we only want to move down. Note
+ * that since we stop the outer loop at last_move_dest_block, pages
+ * removed here cannot have had anything moved onto them already.
+ */
+ while (num_fraged_pages > 0 &&
+ fraged_pages->pagedesc[num_fraged_pages-1]->blkno >= blkno)
+ {
+ Assert(fraged_pages->pagedesc[num_fraged_pages-1]->offsets_used == 0);
+ --num_fraged_pages;
+ }
+
+ /*
+ * Process this page of relation.
+ */
buf = ReadBuffer(onerel, blkno);
page = BufferGetPage(buf);
isempty = PageIsEmpty(page);
dowrite = false;
- if (blkno == last_vacuum_block) /* it's reaped page */
+
+ /* Is the page in the vacuum_pages list? */
+ if (blkno == last_vacuum_block)
{
- if (last_vacuum_page->offsets_free > 0) /* there are dead tuples */
- { /* on this page - clean */
+ if (last_vacuum_page->offsets_free > 0)
+ {
+ /* there are dead tuples on this page - clean them */
Assert(!isempty);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
vacuum_page(onerel, buf, last_vacuum_page);
last_vacuum_page = NULL;
last_vacuum_block = InvalidBlockNumber;
}
- if (num_fraged_pages > 0 &&
- fraged_pages->pagedesc[num_fraged_pages - 1]->blkno == blkno)
- {
- /* page is in fraged_pages too; remove it */
- --num_fraged_pages;
- }
if (isempty)
{
ReleaseBuffer(buf);
if (to_vacpage == NULL ||
!enough_space(to_vacpage, tlen))
{
-
- /*
- * if to_vacpage no longer has enough free space
- * to be useful, remove it from fraged_pages list
- */
- if (to_vacpage != NULL &&
- !enough_space(to_vacpage, vacrelstats->min_tlen))
- {
- Assert(num_fraged_pages > to_item);
- memmove(fraged_pages->pagedesc + to_item,
- fraged_pages->pagedesc + to_item + 1,
- sizeof(VacPage) * (num_fraged_pages - to_item - 1));
- num_fraged_pages--;
- }
for (i = 0; i < num_fraged_pages; i++)
{
if (enough_space(fraged_pages->pagedesc[i], tlen))
break;
}
- /* can't move item anywhere */
if (i == num_fraged_pages)
{
+ /* can't move item anywhere */
for (i = 0; i < num_vtmove; i++)
{
Assert(vtmove[i].vacpage->offsets_used > 0);
{
WriteBuffer(cur_buffer);
cur_buffer = InvalidBuffer;
-
- /*
- * If previous target page is now too full to add *any*
- * tuple to it, remove it from fraged_pages.
- */
- if (!enough_space(cur_page, vacrelstats->min_tlen))
- {
- Assert(num_fraged_pages > cur_item);
- memmove(fraged_pages->pagedesc + cur_item,
- fraged_pages->pagedesc + cur_item + 1,
- sizeof(VacPage) * (num_fraged_pages - cur_item - 1));
- num_fraged_pages--;
- }
}
for (i = 0; i < num_fraged_pages; i++)
{
qsort((char *) (vacpage->offsets), vacpage->offsets_free,
sizeof(OffsetNumber), vac_cmp_offno);
}
- reap_page(&Nvacpagelist, vacpage);
+ vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
WriteBuffer(buf);
}
else if (dowrite)
if (num_moved > 0)
{
-
/*
* We have to commit our tuple movings before we truncate the
* relation. Ideally we should do Commit/StartTransactionCommand
}
/*
- * Clean uncleaned reaped pages from vacuum_pages list list and set
- * xmin committed for inserted tuples
+ * We are not going to move any more tuples across pages, but we still
+ * need to apply vacuum_page to compact free space in the remaining
+ * pages in vacuum_pages list. Note that some of these pages may also
+ * be in the fraged_pages list, and may have had tuples moved onto them;
+ * if so, we already did vacuum_page and needn't do it again.
*/
- checked_moved = 0;
- for (i = 0, curpage = vacuum_pages->pagedesc; i < vacuumed_pages; i++, curpage++)
+ for (i = 0, curpage = vacuum_pages->pagedesc;
+ i < vacuumed_pages;
+ i++, curpage++)
{
Assert((*curpage)->blkno < blkno);
- buf = ReadBuffer(onerel, (*curpage)->blkno);
- LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
- page = BufferGetPage(buf);
- if ((*curpage)->offsets_used == 0) /* this page was not used */
+ if ((*curpage)->offsets_used == 0)
{
+ /* this page was not used as a move target, so must clean it */
+ buf = ReadBuffer(onerel, (*curpage)->blkno);
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+ page = BufferGetPage(buf);
if (!PageIsEmpty(page))
vacuum_page(onerel, buf, *curpage);
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ WriteBuffer(buf);
}
- else
-/* this page was used */
+ }
+
+ /*
+ * Now scan all the pages that we moved tuples onto and update
+ * tuple status bits. This is not really necessary, but will save time
+ * for future transactions examining these tuples.
+ *
+ * XXX Notice that this code fails to clear HEAP_MOVED_OFF tuples from
+ * pages that were move source pages but not move dest pages. One also
+ * wonders whether it wouldn't be better to skip this step and let the
+ * tuple status updates happen someplace that's not holding an exclusive
+ * lock on the relation.
+ */
+ checked_moved = 0;
+ for (i = 0, curpage = fraged_pages->pagedesc;
+ i < num_fraged_pages;
+ i++, curpage++)
+ {
+ Assert((*curpage)->blkno < blkno);
+ if ((*curpage)->blkno > last_move_dest_block)
+ break; /* no need to scan any further */
+ if ((*curpage)->offsets_used == 0)
+ continue; /* this page was never used as a move dest */
+ buf = ReadBuffer(onerel, (*curpage)->blkno);
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+ page = BufferGetPage(buf);
+ num_tuples = 0;
+ max_offset = PageGetMaxOffsetNumber(page);
+ for (newoff = FirstOffsetNumber;
+ newoff <= max_offset;
+ newoff = OffsetNumberNext(newoff))
{
- num_tuples = 0;
- max_offset = PageGetMaxOffsetNumber(page);
- for (newoff = FirstOffsetNumber;
- newoff <= max_offset;
- newoff = OffsetNumberNext(newoff))
+ itemid = PageGetItemId(page, newoff);
+ if (!ItemIdIsUsed(itemid))
+ continue;
+ tuple.t_datamcxt = NULL;
+ tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+ if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
{
- itemid = PageGetItemId(page, newoff);
- if (!ItemIdIsUsed(itemid))
- continue;
- tuple.t_datamcxt = NULL;
- tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
- if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+ if ((TransactionId) tuple.t_data->t_cmin != myXID)
+ elog(ERROR, "Invalid XID in t_cmin (2)");
+ if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
{
- if ((TransactionId) tuple.t_data->t_cmin != myXID)
- elog(ERROR, "Invalid XID in t_cmin (2)");
- if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
- {
- tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
- num_tuples++;
- }
- else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
- tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
- else
- elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+ tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
+ num_tuples++;
}
+ else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
+ tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
+ else
+ elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
}
- Assert((*curpage)->offsets_used == num_tuples);
- checked_moved += num_tuples;
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
WriteBuffer(buf);
+ Assert((*curpage)->offsets_used == num_tuples);
+ checked_moved += num_tuples;
}
Assert(num_moved == checked_moved);
* tuples have correct on-row commit status on disk (see bufmgr.c's
* comments for FlushRelationBuffers()).
*/
- Assert(vacrelstats->rel_pages >= (BlockNumber) vacuum_pages->empty_end_pages);
+ Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
i = FlushRelationBuffers(onerel, relblocks);
heap_close(rd, RowExclusiveLock);
}
-/*
- * reap_page() -- save a page on the array of reaped pages.
- *
- * As a side effect of the way that the vacuuming loop for a given
- * relation works, higher pages come after lower pages in the array
- * (and highest tid on a page is last).
- */
-static void
-reap_page(VacPageList vacpagelist, VacPage vacpage)
+/* Copy a VacPage structure */
+static VacPage
+copy_vac_page(VacPage vacpage)
{
VacPage newvacpage;
/* allocate a VacPageData entry */
- newvacpage = (VacPage) palloc(sizeof(VacPageData) + vacpage->offsets_free * sizeof(OffsetNumber));
+ newvacpage = (VacPage) palloc(sizeof(VacPageData) +
+ vacpage->offsets_free * sizeof(OffsetNumber));
/* fill it in */
if (vacpage->offsets_free > 0)
- memmove(newvacpage->offsets, vacpage->offsets, vacpage->offsets_free * sizeof(OffsetNumber));
+ memcpy(newvacpage->offsets, vacpage->offsets,
+ vacpage->offsets_free * sizeof(OffsetNumber));
newvacpage->blkno = vacpage->blkno;
newvacpage->free = vacpage->free;
newvacpage->offsets_used = vacpage->offsets_used;
newvacpage->offsets_free = vacpage->offsets_free;
- /* insert this page into vacpagelist list */
- vpage_insert(vacpagelist, newvacpage);
-
+ return newvacpage;
}
+/*
+ * Add a VacPage pointer to a VacPageList.
+ *
+ * As a side effect of the way that scan_heap works,
+ * higher pages come after lower pages in the array
+ * (and highest tid on a page is last).
+ */
static void
vpage_insert(VacPageList vacpagelist, VacPage vpnew)
{