* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtree.c,v 1.147 2006/05/07 01:21:30 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtree.c,v 1.148 2006/05/08 00:00:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
double indtuples;
} BTBuildState;
+/* Working state needed by btvacuumpage */
+typedef struct
+{
+ IndexVacuumInfo *info;
+ IndexBulkDeleteResult *stats;
+ IndexBulkDeleteCallback callback;
+ void *callback_state;
+ BTCycleId cycleid;
+ BlockNumber *freePages;
+ int nFreePages;
+ int maxFreePages;
+ MemoryContext pagedelcontext;
+} BTVacState;
+
static void btbuildCallback(Relation index,
HeapTuple htup,
bool *isnull,
bool tupleIsAlive,
void *state);
+static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
+ IndexBulkDeleteCallback callback, void *callback_state,
+ BTCycleId cycleid);
+static void btvacuumpage(BTVacState *vstate, BlockNumber blkno,
+ BlockNumber orig_blkno);
/*
btbulkdelete(PG_FUNCTION_ARGS)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
- IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
+ IndexBulkDeleteResult * volatile stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3);
Relation rel = info->index;
- double tuples_removed = 0;
- OffsetNumber deletable[MaxOffsetNumber];
- int ndeletable;
- Buffer buf;
+ BTCycleId cycleid;
- /*
- * The outer loop iterates over index leaf pages, the inner over items on
- * a leaf page. We issue just one _bt_delitems() call per page, so as to
- * minimize WAL traffic.
- *
- * Note that we exclusive-lock every leaf page containing data items, in
- * sequence left to right. It sounds attractive to only exclusive-lock
- * those containing items we need to delete, but unfortunately that is not
- * safe: we could then pass a stopped indexscan, which could in rare cases
- * lead to deleting items that the indexscan will still return later.
- * (See discussion in nbtree/README.) We can skip obtaining exclusive
- * lock on empty pages though, since no indexscan could be stopped on
- * those. (Note: this presumes that a split couldn't have left either
- * page totally empty.)
- */
- buf = _bt_get_endpoint(rel, 0, false);
+ /* allocate stats if first time through, else re-use existing struct */
+ if (stats == NULL)
+ stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
- if (BufferIsValid(buf)) /* check for empty index */
+ /* Establish the vacuum cycle ID to use for this scan */
+ PG_TRY();
{
- for (;;)
- {
- Page page;
- BTPageOpaque opaque;
- OffsetNumber offnum,
- minoff,
- maxoff;
- BlockNumber nextpage;
-
- ndeletable = 0;
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- minoff = P_FIRSTDATAKEY(opaque);
- maxoff = PageGetMaxOffsetNumber(page);
- /* We probably cannot see deleted pages, but skip 'em if so */
- if (minoff <= maxoff && !P_ISDELETED(opaque))
- {
- /*
- * Trade in the initial read lock for a super-exclusive write
- * lock on this page.
- */
- LockBuffer(buf, BUFFER_LOCK_UNLOCK);
- LockBufferForCleanup(buf);
-
- /*
- * Recompute minoff/maxoff, both of which could have changed
- * while we weren't holding the lock.
- */
- minoff = P_FIRSTDATAKEY(opaque);
- maxoff = PageGetMaxOffsetNumber(page);
-
- /*
- * Scan over all items to see which ones need deleted
- * according to the callback function.
- */
- for (offnum = minoff;
- offnum <= maxoff;
- offnum = OffsetNumberNext(offnum))
- {
- IndexTuple itup;
- ItemPointer htup;
-
- itup = (IndexTuple)
- PageGetItem(page, PageGetItemId(page, offnum));
- htup = &(itup->t_tid);
- if (callback(htup, callback_state))
- {
- deletable[ndeletable++] = offnum;
- tuples_removed += 1;
- }
- }
- }
+ cycleid = _bt_start_vacuum(rel);
- /* Apply any needed deletes */
- if (ndeletable > 0)
- _bt_delitems(rel, buf, deletable, ndeletable);
+ btvacuumscan(info, stats, callback, callback_state, cycleid);
- /* Fetch nextpage link before releasing the buffer */
- nextpage = opaque->btpo_next;
- _bt_relbuf(rel, buf);
-
- /* call vacuum_delay_point while not holding any buffer lock */
- vacuum_delay_point();
-
- /* And advance to next page, if any */
- if (nextpage == P_NONE)
- break;
- buf = _bt_getbuf(rel, nextpage, BT_READ);
- }
+ _bt_end_vacuum(rel);
}
-
- /* return statistics */
- if (stats == NULL)
- stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
- stats->tuples_removed += tuples_removed;
- /* btvacuumcleanup will fill in num_pages and num_index_tuples */
+ PG_CATCH();
+ {
+ /* Make sure shared memory gets cleaned up */
+ _bt_end_vacuum(rel);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
PG_RETURN_POINTER(stats);
}
/*
* Post-VACUUM cleanup.
*
- * Here, we scan looking for pages we can delete or return to the freelist.
- *
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
- Relation rel = info->index;
- BlockNumber num_pages;
- BlockNumber blkno;
- BlockNumber *freePages;
- int nFreePages,
- maxFreePages;
- double num_index_tuples = 0;
- BlockNumber pages_deleted = 0;
- MemoryContext mycontext;
- MemoryContext oldcontext;
- bool needLock;
- /* Set up all-zero stats if btbulkdelete wasn't called */
+ /*
+ * If btbulkdelete was called, we need not do anything, just return
+ * the stats from the latest btbulkdelete call. If it wasn't called,
+ * we must still do a pass over the index, to recycle any newly-recyclable
+ * pages and to obtain index statistics.
+ *
+ * Since we aren't going to actually delete any leaf items, there's no
+ * need to go through all the vacuum-cycle-ID pushups.
+ */
if (stats == NULL)
+ {
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+ btvacuumscan(info, stats, NULL, NULL, 0);
+ }
/*
- * First find out the number of pages in the index. We must acquire the
- * relation-extension lock while doing this to avoid a race condition: if
- * someone else is extending the relation, there is a window where
- * bufmgr/smgr have created a new all-zero page but it hasn't yet been
- * write-locked by _bt_getbuf(). If we manage to scan such a page here,
- * we'll improperly assume it can be recycled. Taking the lock
- * synchronizes things enough to prevent a problem: either num_pages won't
- * include the new page, or _bt_getbuf already has write lock on the
- * buffer and it will be fully initialized before we can examine it. (See
- * also vacuumlazy.c, which has the same issue.)
- *
- * We can skip locking for new or temp relations, however, since no one
- * else could be accessing them.
+ * During a non-FULL vacuum it's quite possible for us to be fooled by
+ * concurrent page splits into double-counting some index tuples, so
+ * disbelieve any total that exceeds the underlying heap's count.
+ * (We can't check this during btbulkdelete.)
*/
- needLock = !RELATION_IS_LOCAL(rel);
-
- if (needLock)
- LockRelationForExtension(rel, ExclusiveLock);
+ if (!info->vacuum_full)
+ {
+ if (stats->num_index_tuples > info->num_heap_tuples)
+ stats->num_index_tuples = info->num_heap_tuples;
+ }
- num_pages = RelationGetNumberOfBlocks(rel);
+ PG_RETURN_POINTER(stats);
+}
- if (needLock)
- UnlockRelationForExtension(rel, ExclusiveLock);
+/*
+ * btvacuumscan --- scan the index for VACUUMing purposes
+ *
+ * This combines the functions of looking for leaf tuples that are deletable
+ * according to the vacuum callback, looking for empty pages that can be
+ * deleted, and looking for old deleted pages that can be recycled. Both
+ * btbulkdelete and btvacuumcleanup invoke this (the latter only if no
+ * btbulkdelete call occurred).
+ *
+ * The caller is responsible for initially allocating/zeroing a stats struct
+ * and for obtaining a vacuum cycle ID if necessary.
+ */
+static void
+btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
+ IndexBulkDeleteCallback callback, void *callback_state,
+ BTCycleId cycleid)
+{
+ Relation rel = info->index;
+ BTVacState vstate;
+ BlockNumber num_pages;
+ BlockNumber blkno;
+ bool needLock;
- /* No point in remembering more than MaxFSMPages pages */
- maxFreePages = MaxFSMPages;
- if ((BlockNumber) maxFreePages > num_pages)
- maxFreePages = (int) num_pages;
- freePages = (BlockNumber *) palloc(maxFreePages * sizeof(BlockNumber));
- nFreePages = 0;
+ /*
+ * Reset counts that will be incremented during the scan; needed in
+ * case of multiple scans during a single VACUUM command
+ */
+ stats->num_index_tuples = 0;
+ stats->pages_deleted = 0;
+
+ /* Set up info to pass down to btvacuumpage */
+ vstate.info = info;
+ vstate.stats = stats;
+ vstate.callback = callback;
+ vstate.callback_state = callback_state;
+ vstate.cycleid = cycleid;
+ vstate.freePages = NULL; /* temporarily */
+ vstate.nFreePages = 0;
+ vstate.maxFreePages = 0;
/* Create a temporary memory context to run _bt_pagedel in */
- mycontext = AllocSetContextCreate(CurrentMemoryContext,
- "_bt_pagedel",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ vstate.pagedelcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "_bt_pagedel",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
/*
- * Scan through all pages of index, except metapage. (Any pages added
- * after we start the scan will not be examined; this should be fine,
- * since they can't possibly be empty.)
+ * The outer loop iterates over all index pages except the metapage,
+ * in physical order (we hope the kernel will cooperate in providing
+ * read-ahead for speed). It is critical that we visit all leaf pages,
+ * including ones added after we start the scan, else we might fail to
+ * delete some deletable tuples. Hence, we must repeatedly check the
+ * relation length. We must acquire the relation-extension lock while
+ * doing so to avoid a race condition: if someone else is extending the
+ * relation, there is a window where bufmgr/smgr have created a new
+ * all-zero page but it hasn't yet been write-locked by _bt_getbuf().
+ * If we manage to scan such a page here, we'll improperly assume it can
+ * be recycled. Taking the lock synchronizes things enough to prevent a
+ * problem: either num_pages won't include the new page, or _bt_getbuf
+ * already has write lock on the buffer and it will be fully initialized
+ * before we can examine it. (See also vacuumlazy.c, which has the same
+ * issue.) Also, we need not worry if a page is added immediately after
+ * we look; the page splitting code already has write-lock on the left
+ * page before it adds a right page, so we must already have processed
+ * any tuples due to be moved into such a page.
+ *
+ * We can skip locking for new or temp relations, however, since no one
+ * else could be accessing them.
*/
- for (blkno = BTREE_METAPAGE + 1; blkno < num_pages; blkno++)
- {
- Buffer buf;
- Page page;
- BTPageOpaque opaque;
-
- vacuum_delay_point();
+ needLock = !RELATION_IS_LOCAL(rel);
- /*
- * We can't use _bt_getbuf() here because it always applies
- * _bt_checkpage(), which will barf on an all-zero page. We want to
- * recycle all-zero pages, not fail.
- */
- buf = ReadBuffer(rel, blkno);
- LockBuffer(buf, BT_READ);
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- if (!PageIsNew(page))
- _bt_checkpage(rel, buf);
- if (_bt_page_recyclable(page))
- {
- /* Okay to recycle this page */
- if (nFreePages < maxFreePages)
- freePages[nFreePages++] = blkno;
- pages_deleted++;
- }
- else if (P_ISDELETED(opaque))
+ blkno = BTREE_METAPAGE + 1;
+ for (;;)
+ {
+ /* Get the current relation length */
+ if (needLock)
+ LockRelationForExtension(rel, ExclusiveLock);
+ num_pages = RelationGetNumberOfBlocks(rel);
+ if (needLock)
+ UnlockRelationForExtension(rel, ExclusiveLock);
+
+ /* Allocate freePages after we read num_pages the first time */
+ if (vstate.freePages == NULL)
{
- /* Already deleted, but can't recycle yet */
- pages_deleted++;
+ /* No point in remembering more than MaxFSMPages pages */
+ vstate.maxFreePages = MaxFSMPages;
+ if ((BlockNumber) vstate.maxFreePages > num_pages)
+ vstate.maxFreePages = (int) num_pages;
+ vstate.freePages = (BlockNumber *)
+ palloc(vstate.maxFreePages * sizeof(BlockNumber));
}
- else if ((opaque->btpo_flags & BTP_HALF_DEAD) ||
- P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page))
- {
- /* Empty, try to delete */
- int ndel;
-
- /* Run pagedel in a temp context to avoid memory leakage */
- MemoryContextReset(mycontext);
- oldcontext = MemoryContextSwitchTo(mycontext);
- ndel = _bt_pagedel(rel, buf, info->vacuum_full);
-
- /* count only this page, else may double-count parent */
- if (ndel)
- pages_deleted++;
-
- /*
- * During VACUUM FULL it's okay to recycle deleted pages
- * immediately, since there can be no other transactions scanning
- * the index. Note that we will only recycle the current page and
- * not any parent pages that _bt_pagedel might have recursed to;
- * this seems reasonable in the name of simplicity. (Trying to do
- * otherwise would mean we'd have to sort the list of recyclable
- * pages we're building.)
- */
- if (ndel && info->vacuum_full)
- {
- if (nFreePages < maxFreePages)
- freePages[nFreePages++] = blkno;
- }
-
- MemoryContextSwitchTo(oldcontext);
- continue; /* pagedel released buffer */
- }
- else if (P_ISLEAF(opaque))
+ /* Quit if we've scanned the whole relation */
+ if (blkno >= num_pages)
+ break;
+ /* Iterate over pages, then loop back to recheck length */
+ for (; blkno < num_pages; blkno++)
{
- /* Count the index entries of live leaf pages */
- num_index_tuples += PageGetMaxOffsetNumber(page) + 1 -
- P_FIRSTDATAKEY(opaque);
+ btvacuumpage(&vstate, blkno, blkno);
}
- _bt_relbuf(rel, buf);
}
/*
* acquiring exclusive lock on the index and then rechecking all the
* pages; doesn't seem worth it.
*/
- if (info->vacuum_full && nFreePages > 0)
+ if (info->vacuum_full && vstate.nFreePages > 0)
{
BlockNumber new_pages = num_pages;
- while (nFreePages > 0 && freePages[nFreePages - 1] == new_pages - 1)
+ while (vstate.nFreePages > 0 &&
+ vstate.freePages[vstate.nFreePages - 1] == new_pages - 1)
{
new_pages--;
- pages_deleted--;
- nFreePages--;
+ stats->pages_deleted--;
+ vstate.nFreePages--;
}
if (new_pages != num_pages)
{
RelationTruncate(rel, new_pages);
/* update statistics */
- stats->pages_removed = num_pages - new_pages;
+ stats->pages_removed += num_pages - new_pages;
num_pages = new_pages;
}
* pages in the index, discarding any old info the map may have. We do not
* need to sort the page numbers; they're in order already.
*/
- RecordIndexFreeSpace(&rel->rd_node, nFreePages, freePages);
+ RecordIndexFreeSpace(&rel->rd_node, vstate.nFreePages, vstate.freePages);
- pfree(freePages);
+ pfree(vstate.freePages);
- MemoryContextDelete(mycontext);
+ MemoryContextDelete(vstate.pagedelcontext);
/* update statistics */
stats->num_pages = num_pages;
- stats->num_index_tuples = num_index_tuples;
- stats->pages_deleted = pages_deleted;
- stats->pages_free = nFreePages;
+ stats->pages_free = vstate.nFreePages;
+}
- PG_RETURN_POINTER(stats);
+/*
+ * btvacuumpage --- VACUUM one page
+ *
+ * This processes a single page for btvacuumscan(). In some cases we
+ * must go back and re-examine previously-scanned pages; this routine
+ * recurses when necessary to handle that case.
+ *
+ * blkno is the page to process. orig_blkno is the highest block number
+ * reached by the outer btvacuumscan loop (the same as blkno, unless we
+ * are recursing to re-examine a previous page).
+ */
+static void
+btvacuumpage(BTVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno)
+{
+ IndexVacuumInfo *info = vstate->info;
+ IndexBulkDeleteResult *stats = vstate->stats;
+ IndexBulkDeleteCallback callback = vstate->callback;
+ void *callback_state = vstate->callback_state;
+ Relation rel = info->index;
+ bool delete_now;
+ BlockNumber recurse_to;
+ Buffer buf;
+ Page page;
+ BTPageOpaque opaque;
+
+restart:
+ delete_now = false;
+ recurse_to = P_NONE;
+
+ /* call vacuum_delay_point while not holding any buffer lock */
+ vacuum_delay_point();
+
+ /*
+ * We can't use _bt_getbuf() here because it always applies
+ * _bt_checkpage(), which will barf on an all-zero page. We want to
+ * recycle all-zero pages, not fail.
+ */
+ buf = ReadBuffer(rel, blkno);
+ LockBuffer(buf, BT_READ);
+ page = BufferGetPage(buf);
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ if (!PageIsNew(page))
+ _bt_checkpage(rel, buf);
+
+ /*
+ * If we are recursing, the only case we want to do anything with is
+ * a live leaf page having the current vacuum cycle ID. Any other state
+ * implies we already saw the page (eg, deleted it as being empty).
+ * In particular, we don't want to risk adding it to freePages twice.
+ */
+ if (blkno != orig_blkno)
+ {
+ if (_bt_page_recyclable(page) ||
+ P_ISDELETED(opaque) ||
+ (opaque->btpo_flags & BTP_HALF_DEAD) ||
+ !P_ISLEAF(opaque) ||
+ opaque->btpo_cycleid != vstate->cycleid)
+ {
+ _bt_relbuf(rel, buf);
+ return;
+ }
+ }
+
+ /* Page is valid, see what to do with it */
+ if (_bt_page_recyclable(page))
+ {
+ /* Okay to recycle this page */
+ if (vstate->nFreePages < vstate->maxFreePages)
+ vstate->freePages[vstate->nFreePages++] = blkno;
+ stats->pages_deleted++;
+ }
+ else if (P_ISDELETED(opaque))
+ {
+ /* Already deleted, but can't recycle yet */
+ stats->pages_deleted++;
+ }
+ else if (opaque->btpo_flags & BTP_HALF_DEAD)
+ {
+ /* Half-dead, try to delete */
+ delete_now = true;
+ }
+ else if (P_ISLEAF(opaque))
+ {
+ OffsetNumber deletable[MaxOffsetNumber];
+ int ndeletable;
+ OffsetNumber offnum,
+ minoff,
+ maxoff;
+
+ /*
+ * Trade in the initial read lock for a super-exclusive write
+ * lock on this page. We must get such a lock on every leaf page
+ * over the course of the vacuum scan, whether or not it actually
+ * contains any deletable tuples --- see nbtree/README.
+ */
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ LockBufferForCleanup(buf);
+
+ /*
+ * Check whether we need to recurse back to earlier pages. What
+ * we are concerned about is a page split that happened since we
+ * started the vacuum scan. If the split moved some tuples to a
+ * lower page then we might have missed 'em. If so, set up for
+ * tail recursion. (Must do this before possibly clearing
+ * btpo_cycleid below!)
+ */
+ if (vstate->cycleid != 0 &&
+ opaque->btpo_cycleid == vstate->cycleid &&
+ !(opaque->btpo_flags & BTP_SPLIT_END) &&
+ !P_RIGHTMOST(opaque) &&
+ opaque->btpo_next < orig_blkno)
+ recurse_to = opaque->btpo_next;
+
+ /*
+ * Scan over all items to see which ones need deleted
+ * according to the callback function.
+ */
+ ndeletable = 0;
+ minoff = P_FIRSTDATAKEY(opaque);
+ maxoff = PageGetMaxOffsetNumber(page);
+ if (callback)
+ {
+ for (offnum = minoff;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ IndexTuple itup;
+ ItemPointer htup;
+
+ itup = (IndexTuple) PageGetItem(page,
+ PageGetItemId(page, offnum));
+ htup = &(itup->t_tid);
+ if (callback(htup, callback_state))
+ deletable[ndeletable++] = offnum;
+ }
+ }
+
+ /*
+ * Apply any needed deletes. We issue just one _bt_delitems()
+ * call per page, so as to minimize WAL traffic.
+ */
+ if (ndeletable > 0)
+ {
+ _bt_delitems(rel, buf, deletable, ndeletable);
+ stats->tuples_removed += ndeletable;
+ /* must recompute maxoff */
+ maxoff = PageGetMaxOffsetNumber(page);
+ }
+ else
+ {
+ /*
+ * If the page has been split during this vacuum cycle, it seems
+ * worth expending a write to clear btpo_cycleid even if we don't
+ * have any deletions to do. (If we do, _bt_delitems takes care
+ * of this.) This ensures we won't process the page again.
+ *
+ * We treat this like a hint-bit update because there's no need
+ * to WAL-log it.
+ */
+ if (vstate->cycleid != 0 &&
+ opaque->btpo_cycleid == vstate->cycleid)
+ {
+ opaque->btpo_cycleid = 0;
+ SetBufferCommitInfoNeedsSave(buf);
+ }
+ }
+
+ /*
+ * If it's now empty, try to delete; else count the live tuples.
+ * We don't delete when recursing, though, to avoid putting entries
+ * into freePages out-of-order (doesn't seem worth any extra code to
+ * handle the case).
+ */
+ if (minoff > maxoff)
+ delete_now = (blkno == orig_blkno);
+ else
+ stats->num_index_tuples += maxoff - minoff + 1;
+ }
+
+ if (delete_now)
+ {
+ MemoryContext oldcontext;
+ int ndel;
+
+ /* Run pagedel in a temp context to avoid memory leakage */
+ MemoryContextReset(vstate->pagedelcontext);
+ oldcontext = MemoryContextSwitchTo(vstate->pagedelcontext);
+
+ ndel = _bt_pagedel(rel, buf, info->vacuum_full);
+
+ /* count only this page, else may double-count parent */
+ if (ndel)
+ stats->pages_deleted++;
+
+ /*
+ * During VACUUM FULL it's okay to recycle deleted pages
+ * immediately, since there can be no other transactions scanning
+ * the index. Note that we will only recycle the current page and
+ * not any parent pages that _bt_pagedel might have recursed to;
+ * this seems reasonable in the name of simplicity. (Trying to do
+ * otherwise would mean we'd have to sort the list of recyclable
+ * pages we're building.)
+ */
+ if (ndel && info->vacuum_full)
+ {
+ if (vstate->nFreePages < vstate->maxFreePages)
+ vstate->freePages[vstate->nFreePages++] = blkno;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+ /* pagedel released buffer, so we shouldn't */
+ }
+ else
+ _bt_relbuf(rel, buf);
+
+ /*
+ * This is really tail recursion, but if the compiler is too stupid
+ * to optimize it as such, we'd eat an uncomfortably large amount of
+ * stack space per recursion level (due to the deletable[] array).
+ * A failure is improbable since the number of levels isn't likely to be
+ * large ... but just in case, let's hand-optimize into a loop.
+ */
+ if (recurse_to != P_NONE)
+ {
+ blkno = recurse_to;
+ goto restart;
+ }
}
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtutils.c,v 1.73 2006/05/07 01:21:30 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtutils.c,v 1.74 2006/05/08 00:00:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <time.h>
+
#include "access/genam.h"
#include "access/nbtree.h"
#include "catalog/catalog.h"
#include "executor/execdebug.h"
+#include "miscadmin.h"
static void _bt_mark_scankey_required(ScanKey skey);
* delete. We cope with cases where items have moved right due to insertions.
* If an item has moved off the current page due to a split, we'll fail to
* find it and do nothing (this is not an error case --- we assume the item
- * will eventually get marked in a future indexscan). Likewise, if the item
- * has moved left due to deletions or disappeared itself, we'll not find it,
- * but these cases are not worth optimizing. (Since deletions are only done
- * by VACUUM, any deletion makes it highly likely that the dead item has been
- * removed itself, and therefore searching left is not worthwhile.)
+ * will eventually get marked in a future indexscan). Note that because we
+ * hold pin on the target page continuously from initially reading the items
+ * until applying this function, VACUUM cannot have deleted any items from
+ * the page, and so there is no need to search left from the recorded offset.
+ * (This observation also guarantees that the item is still the right one
+ * to delete, which might otherwise be questionable since heap TIDs can get
+ * recycled.)
*/
void
_bt_killitems(IndexScanDesc scan, bool haveLock)
*/
so->numKilled = 0;
}
+
+
+/*
+ * The following routines manage a shared-memory area in which we track
+ * assignment of "vacuum cycle IDs" to currently-active btree vacuuming
+ * operations. There is a single counter which increments each time we
+ * start a vacuum to assign it a cycle ID. Since multiple vacuums could
+ * be active concurrently, we have to track the cycle ID for each active
+ * vacuum; this requires at most MaxBackends entries (usually far fewer).
+ * We assume at most one vacuum can be active for a given index.
+ *
+ * Access to the shared memory area is controlled by BtreeVacuumLock.
+ * In principle we could use a separate lmgr locktag for each index,
+ * but a single LWLock is much cheaper, and given the short time that
+ * the lock is ever held, the concurrency hit should be minimal.
+ */
+
+typedef struct BTOneVacInfo
+{
+ LockRelId relid; /* global identifier of an index */
+ BTCycleId cycleid; /* cycle ID for its active VACUUM */
+} BTOneVacInfo;
+
+typedef struct BTVacInfo
+{
+ BTCycleId cycle_ctr; /* cycle ID most recently assigned */
+ int num_vacuums; /* number of currently active VACUUMs */
+ int max_vacuums; /* allocated length of vacuums[] array */
+ BTOneVacInfo vacuums[1]; /* VARIABLE LENGTH ARRAY */
+} BTVacInfo;
+
+static BTVacInfo *btvacinfo;
+
+
+/*
+ * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index,
+ * or zero if there is no active VACUUM
+ *
+ * Note: for correct interlocking, the caller must already hold pin and
+ * exclusive lock on each buffer it will store the cycle ID into. This
+ * ensures that even if a VACUUM starts immediately afterwards, it cannot
+ * process those pages until the page split is complete.
+ */
+BTCycleId
+_bt_vacuum_cycleid(Relation rel)
+{
+ BTCycleId result = 0;
+ int i;
+
+ /* Share lock is enough since this is a read-only operation */
+ LWLockAcquire(BtreeVacuumLock, LW_SHARED);
+
+ for (i = 0; i < btvacinfo->num_vacuums; i++)
+ {
+ BTOneVacInfo *vac = &btvacinfo->vacuums[i];
+
+ if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
+ vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
+ {
+ result = vac->cycleid;
+ break;
+ }
+ }
+
+ LWLockRelease(BtreeVacuumLock);
+ return result;
+}
+
+/*
+ * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation
+ *
+ * Note: the caller must guarantee (via PG_TRY) that it will eventually call
+ * _bt_end_vacuum, else we'll permanently leak an array slot.
+ */
+BTCycleId
+_bt_start_vacuum(Relation rel)
+{
+ BTCycleId result;
+ int i;
+ BTOneVacInfo *vac;
+
+ LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
+
+ /* Assign the next cycle ID, being careful to avoid zero */
+ do {
+ result = ++(btvacinfo->cycle_ctr);
+ } while (result == 0);
+
+ /* Let's just make sure there's no entry already for this index */
+ for (i = 0; i < btvacinfo->num_vacuums; i++)
+ {
+ vac = &btvacinfo->vacuums[i];
+ if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
+ vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
+ elog(ERROR, "multiple active vacuums for index \"%s\"",
+ RelationGetRelationName(rel));
+ }
+
+ /* OK, add an entry */
+ if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums)
+ elog(ERROR, "out of btvacinfo slots");
+ vac = &btvacinfo->vacuums[btvacinfo->num_vacuums];
+ vac->relid = rel->rd_lockInfo.lockRelId;
+ vac->cycleid = result;
+ btvacinfo->num_vacuums++;
+
+ LWLockRelease(BtreeVacuumLock);
+ return result;
+}
+
+/*
+ * _bt_end_vacuum --- mark a btree VACUUM operation as done
+ *
+ * Note: this is deliberately coded not to complain if no entry is found;
+ * this allows the caller to put PG_TRY around the start_vacuum operation.
+ */
+void
+_bt_end_vacuum(Relation rel)
+{
+ int i;
+
+ LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
+
+ /* Find the array entry */
+ for (i = 0; i < btvacinfo->num_vacuums; i++)
+ {
+ BTOneVacInfo *vac = &btvacinfo->vacuums[i];
+
+ if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
+ vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
+ {
+ /* Remove it by shifting down the last entry */
+ *vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1];
+ btvacinfo->num_vacuums--;
+ break;
+ }
+ }
+
+ LWLockRelease(BtreeVacuumLock);
+}
+
+/*
+ * BTreeShmemSize --- report amount of shared memory space needed
+ */
+Size
+BTreeShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(BTVacInfo, vacuums[0]);
+ size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo)));
+ return size;
+}
+
+/*
+ * BTreeShmemInit --- initialize this module's shared memory
+ */
+void
+BTreeShmemInit(void)
+{
+ bool found;
+
+ btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State",
+ BTreeShmemSize(),
+ &found);
+
+ if (!IsUnderPostmaster)
+ {
+ /* Initialize shared memory area */
+ Assert(!found);
+
+ /*
+ * It doesn't really matter what the cycle counter starts at, but
+ * having it always start the same doesn't seem good. Seed with
+ * low-order bits of time() instead.
+ */
+ btvacinfo->cycle_ctr = (BTCycleId) time(NULL);
+
+ btvacinfo->num_vacuums = 0;
+ btvacinfo->max_vacuums = MaxBackends;
+ }
+ else
+ Assert(found);
+}