]> granicus.if.org Git - postgresql/commitdiff
First cut at recycling space in btree indexes. Still some rough edges
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 23 Feb 2003 06:17:13 +0000 (06:17 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 23 Feb 2003 06:17:13 +0000 (06:17 +0000)
to fix, but it seems to basically work...

src/backend/access/common/indextuple.c
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtpage.c
src/backend/access/nbtree/nbtree.c
src/backend/access/nbtree/nbtxlog.c
src/backend/storage/freespace/freespace.c
src/include/access/itup.h
src/include/access/nbtree.h

index 7275984e64e9e957f5bb22aca38e98f6a6954a26..e6518922cb9f18f7659de2179c8717ae8c4d4113 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/common/indextuple.c,v 1.63 2002/11/13 00:39:46 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/common/indextuple.c,v 1.64 2003/02/23 06:17:12 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -394,17 +394,16 @@ nocache_index_getattr(IndexTuple tup,
 }
 
 /*
- * Copies source into target.  If *target == NULL, we palloc space; otherwise
- * we assume we have space that is already palloc'ed.
+ * Create a palloc'd copy of an index tuple.
  */
-void
-CopyIndexTuple(IndexTuple source, IndexTuple *target)
+IndexTuple
+CopyIndexTuple(IndexTuple source)
 {
+       IndexTuple      result;
        Size            size;
 
        size = IndexTupleSize(source);
-       if (*target == NULL)
-               *target = (IndexTuple) palloc(size);
-
-       memmove((char *) *target, (char *) source, size);
+       result = (IndexTuple) palloc(size);
+       memcpy(result, source, size);
+       return result;
 }
index e943ca96f1ba1854c12a31949e9158bafacf6731..62d1b5921a4609f67c17018fb55224b4f3092246 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.98 2003/02/22 00:45:03 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.99 2003/02/23 06:17:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -58,7 +58,6 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
                                  int leftfree, int rightfree,
                                  bool newitemonleft, Size firstrightitemsz);
-static Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
 static void _bt_pgaddtup(Relation rel, Page page,
                         Size itemsize, BTItem btitem,
                         OffsetNumber itup_off, const char *where);
@@ -666,7 +665,6 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                                rightoff;
        OffsetNumber maxoff;
        OffsetNumber i;
-       BTItem          lhikey;
 
        rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
        origpage = BufferGetPage(buf);
@@ -730,7 +728,6 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                itemsz = ItemIdGetLength(itemid);
                item = (BTItem) PageGetItem(origpage, itemid);
        }
-       lhikey = item;
        if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
                                        LP_USED) == InvalidOffsetNumber)
                elog(PANIC, "btree: failed to add hikey to the left sibling");
@@ -1262,7 +1259,7 @@ _bt_insert_parent(Relation rel,
  *
  *             Returns InvalidBuffer if item not found (should not happen).
  */
-static Buffer
+Buffer
 _bt_getstackbuf(Relation rel, BTStack stack, int access)
 {
        BlockNumber blkno;
index 0296b71c3633b6f46e12d9b236b3725af3dccd48..16439972024bbd77b145762d342fb2c21bcfbebd 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.60 2003/02/22 00:45:04 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.61 2003/02/23 06:17:13 tgl Exp $
  *
  *     NOTES
  *        Postgres btree pages look like ordinary relation pages.      The opaque
@@ -24,6 +24,7 @@
 
 #include "access/nbtree.h"
 #include "miscadmin.h"
+#include "storage/freespace.h"
 #include "storage/lmgr.h"
 
 
@@ -391,7 +392,38 @@ _bt_getbuf(Relation rel, BlockNumber blkno, int access)
                bool            needLock;
                Page            page;
 
-               /* XXX soon: ask FSM about free space */
+               Assert(access == BT_WRITE);
+
+               /*
+                * First see if the FSM knows of any free pages.
+                *
+                * We can't trust the FSM's report unreservedly; we have to check
+                * that the page is still free.  (For example, an already-free page
+                * could have been re-used between the time the last VACUUM scanned
+                * it and the time the VACUUM made its FSM updates.)
+                *
+                * The request size should be more than half of what btvacuumcleanup
+                * logs as the per-page free space.  We use BLCKSZ/2 and BLCKSZ-1
+                * to try to get some use out of FSM's space management algorithm.
+                * XXX this needs some more thought...
+                */
+               for (;;)
+               {
+                       blkno = GetPageWithFreeSpace(&rel->rd_node, BLCKSZ/2);
+                       if (blkno == InvalidBlockNumber)
+                               break;
+                       buf = ReadBuffer(rel, blkno);
+                       LockBuffer(buf, access);
+                       page = BufferGetPage(buf);
+                       if (_bt_page_recyclable(page))
+                       {
+                               /* Okay to use page.  Re-initialize and return it */
+                               _bt_pageinit(page, BufferGetPageSize(buf));
+                               return buf;
+                       }
+                       elog(DEBUG1, "_bt_getbuf: FSM returned nonrecyclable page");
+                       _bt_relbuf(rel, buf);
+               }
 
                /*
                 * Extend the relation by one page.
@@ -487,6 +519,36 @@ _bt_pageinit(Page page, Size size)
        PageInit(page, size, sizeof(BTPageOpaqueData));
 }
 
+/*
+ *     _bt_page_recyclable() -- Is an existing page recyclable?
+ *
+ * This exists to make sure _bt_getbuf and btvacuumcleanup have the same
+ * policy about whether a page is safe to re-use.
+ */
+bool
+_bt_page_recyclable(Page page)
+{
+       BTPageOpaque opaque;
+
+       /*
+        * It's possible to find an all-zeroes page in an index --- for example,
+        * a backend might successfully extend the relation one page and then
+        * crash before it is able to make a WAL entry for adding the page.
+        * If we find a zeroed page then reclaim it.
+        */
+       if (PageIsNew(page))
+               return true;
+       /*
+        * Otherwise, recycle if deleted and too old to have any processes
+        * interested in it.
+        */
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+       if (P_ISDELETED(opaque) &&
+               TransactionIdPrecedesOrEquals(opaque->btpo.xact, RecentGlobalXmin))
+               return true;
+       return false;
+}
+
 /*
  *     _bt_metaproot() -- Change the root page of the btree.
  *
@@ -605,3 +667,426 @@ _bt_itemdel(Relation rel, Buffer buf, ItemPointer tid)
 
        END_CRIT_SECTION();
 }
+
+/*
+ * _bt_pagedel() -- Delete a page from the b-tree.
+ *
+ * This action unlinks the page from the b-tree structure, removing all
+ * pointers leading to it --- but not touching its own left and right links.
+ * The page cannot be physically reclaimed right away, since other processes
+ * may currently be trying to follow links leading to the page; they have to
+ * be allowed to use its right-link to recover.  See nbtree/README.
+ *
+ * On entry, the target buffer must be pinned and read-locked.  This lock and
+ * pin will be dropped before exiting.
+ *
+ * Returns the number of pages successfully deleted (zero on failure; could
+ * be more than one if parent blocks were deleted).
+ *
+ * NOTE: this leaks memory.  Rather than trying to clean up everything
+ * carefully, it's better to run it in a temp context that can be reset
+ * frequently.
+ */
+int
+_bt_pagedel(Relation rel, Buffer buf, bool vacuum_full)
+{
+       BlockNumber     target,
+                               leftsib,
+                               rightsib,
+                               parent;
+       OffsetNumber poffset,
+                               maxoff;
+       uint32          targetlevel,
+                               ilevel;
+       ItemId          itemid;
+       BTItem          targetkey,
+                               btitem;
+       ScanKey         itup_scankey;
+       BTStack         stack;
+       Buffer          lbuf,
+                               rbuf,
+                               pbuf;
+       bool            parent_half_dead;
+       bool            parent_one_child;
+       bool            rightsib_empty;
+       Buffer          metabuf = InvalidBuffer;
+       Page            metapg = NULL;
+       BTMetaPageData *metad = NULL;
+       Page            page;
+       BTPageOpaque opaque;
+
+       /*
+        * We can never delete rightmost pages nor root pages.  While at it,
+        * check that page is not already deleted and is empty.
+        */
+       page = BufferGetPage(buf);
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+       if (P_RIGHTMOST(opaque) || P_ISROOT(opaque) || P_ISDELETED(opaque) ||
+                P_FIRSTDATAKEY(opaque) <= PageGetMaxOffsetNumber(page))
+       {
+               _bt_relbuf(rel, buf);
+               return 0;
+       }
+       /*
+        * Save info about page, including a copy of its high key (it must
+        * have one, being non-rightmost).
+        */
+       target = BufferGetBlockNumber(buf);
+       targetlevel = opaque->btpo.level;
+       leftsib = opaque->btpo_prev;
+       itemid = PageGetItemId(page, P_HIKEY);
+       targetkey = CopyBTItem((BTItem) PageGetItem(page, itemid));
+       /*
+        * We need to get an approximate pointer to the page's parent page.
+        * Use the standard search mechanism to search for the page's high key;
+        * this will give us a link to either the current parent or someplace
+        * to its left (if there are multiple equal high keys).  To avoid
+        * deadlocks, we'd better drop the target page lock first.
+        */
+       _bt_relbuf(rel, buf);
+       /* we need a scan key to do our search, so build one */
+       itup_scankey = _bt_mkscankey(rel, &(targetkey->bti_itup));
+       /* find the leftmost leaf page containing this key */
+       stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
+                                          &lbuf, BT_READ);
+       /* don't need a pin on that either */
+       _bt_relbuf(rel, lbuf);
+       /*
+        * If we are trying to delete an interior page, _bt_search did more
+        * than we needed.  Locate the stack item pointing to our parent level.
+        */
+       ilevel = 0;
+       for (;;)
+       {
+               if (stack == NULL)
+                       elog(ERROR, "_bt_pagedel: not enough stack items");
+               if (ilevel == targetlevel)
+                       break;
+               stack = stack->bts_parent;
+               ilevel++;
+       }
+       /*
+        * We have to lock the pages we need to modify in the standard order:
+        * moving right, then up.  Else we will deadlock against other writers.
+        * 
+        * So, we need to find and write-lock the current left sibling of the
+        * target page.  The sibling that was current a moment ago could have
+        * split, so we may have to move right.  This search could fail if
+        * either the sibling or the target page was deleted by someone else
+        * meanwhile; if so, give up.  (Right now, that should never happen,
+        * since page deletion is only done in VACUUM and there shouldn't be
+        * multiple VACUUMs concurrently on the same table.)
+        */
+       if (leftsib != P_NONE)
+       {
+               lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
+               page = BufferGetPage(lbuf);
+               opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+               while (P_ISDELETED(opaque) || opaque->btpo_next != target)
+               {
+                       /* step right one page */
+                       leftsib = opaque->btpo_next;
+                       _bt_relbuf(rel, lbuf);
+                       if (leftsib == P_NONE)
+                       {
+                               elog(LOG, "_bt_pagedel: no left sibling (concurrent deletion?)");
+                               return 0;
+                       }
+                       lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
+                       page = BufferGetPage(lbuf);
+                       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+               }
+       }
+       else
+               lbuf = InvalidBuffer;
+       /*
+        * Next write-lock the target page itself.  It should be okay to take just
+        * a write lock not a superexclusive lock, since no scans would stop on an
+        * empty page.
+        */
+       buf = _bt_getbuf(rel, target, BT_WRITE);
+       page = BufferGetPage(buf);
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+       /*
+        * Check page is still empty etc, else abandon deletion.  The empty check
+        * is necessary since someone else might have inserted into it while
+        * we didn't have it locked; the others are just for paranoia's sake.
+        */
+       if (P_RIGHTMOST(opaque) || P_ISROOT(opaque) || P_ISDELETED(opaque) ||
+                P_FIRSTDATAKEY(opaque) <= PageGetMaxOffsetNumber(page))
+       {
+               _bt_relbuf(rel, buf);
+               if (BufferIsValid(lbuf))
+                       _bt_relbuf(rel, lbuf);
+               return 0;
+       }
+       if (opaque->btpo_prev != leftsib)
+               elog(ERROR, "_bt_pagedel: left link changed unexpectedly");
+       /*
+        * And next write-lock the (current) right sibling.
+        */
+       rightsib = opaque->btpo_next;
+       rbuf = _bt_getbuf(rel, rightsib, BT_WRITE);
+       /*
+        * Next find and write-lock the current parent of the target page.
+        * This is essentially the same as the corresponding step of splitting.
+        */
+       ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
+                                  target, P_HIKEY);
+       pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+       if (pbuf == InvalidBuffer)
+               elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
+                        "\n\tRecreate index %s.", RelationGetRelationName(rel));
+       parent = stack->bts_blkno;
+       poffset = stack->bts_offset;
+       /*
+        * If the target is the rightmost child of its parent, then we can't
+        * delete, unless it's also the only child --- in which case the parent
+        * changes to half-dead status.
+        */
+       page = BufferGetPage(pbuf);
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+       maxoff = PageGetMaxOffsetNumber(page);
+       parent_half_dead = false;
+       parent_one_child = false;
+       if (poffset >= maxoff)
+       {
+               if (poffset == P_FIRSTDATAKEY(opaque))
+                       parent_half_dead = true;
+               else
+               {
+                       _bt_relbuf(rel, pbuf);
+                       _bt_relbuf(rel, rbuf);
+                       _bt_relbuf(rel, buf);
+                       if (BufferIsValid(lbuf))
+                               _bt_relbuf(rel, lbuf);
+                       return 0;
+               }
+       }
+       else
+       {
+               /* Will there be exactly one child left in this parent? */
+               if (OffsetNumberNext(P_FIRSTDATAKEY(opaque)) == maxoff)
+                       parent_one_child = true;
+       }
+       /*
+        * If we are deleting the next-to-last page on the target's level,
+        * then the rightsib is a candidate to become the new fast root.
+        * (In theory, it might be possible to push the fast root even further
+        * down, but the odds of doing so are slim, and the locking considerations
+        * daunting.)
+        *
+        * We can safely acquire a lock on the metapage here --- see comments for
+        * _bt_newroot().
+        */
+       if (leftsib == P_NONE)
+       {
+               page = BufferGetPage(rbuf);
+               opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+               Assert(opaque->btpo.level == targetlevel);
+               if (P_RIGHTMOST(opaque))
+               {
+                       /* rightsib will be the only one left on the level */
+                       metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
+                       metapg = BufferGetPage(metabuf);
+                       metad = BTPageGetMeta(metapg);
+                       /*
+                        * The expected case here is btm_fastlevel == targetlevel+1;
+                        * if the fastlevel is <= targetlevel, something is wrong, and we
+                        * choose to overwrite it to fix it.
+                        */
+                       if (metad->btm_fastlevel > targetlevel+1)
+                       {
+                               /* no update wanted */
+                               _bt_relbuf(rel, metabuf);
+                               metabuf = InvalidBuffer;
+                       }
+               }
+       }
+
+       /*
+        * Here we begin doing the deletion.
+        */
+
+       /* No elog(ERROR) until changes are logged */
+       START_CRIT_SECTION();
+
+       /*
+        * Update parent.  The normal case is a tad tricky because we want to
+        * delete the target's downlink and the *following* key.  Easiest way is
+        * to copy the right sibling's downlink over the target downlink, and then
+        * delete the following item.
+        */
+       page = BufferGetPage(pbuf);
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+       if (parent_half_dead)
+       {
+               PageIndexTupleDelete(page, poffset);
+               opaque->btpo_flags |= BTP_HALF_DEAD;
+       }
+       else
+       {
+               OffsetNumber    nextoffset;
+
+               itemid = PageGetItemId(page, poffset);
+               btitem = (BTItem) PageGetItem(page, itemid);
+               Assert(ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid)) == target);
+               ItemPointerSet(&(btitem->bti_itup.t_tid), rightsib, P_HIKEY);
+
+               nextoffset = OffsetNumberNext(poffset);
+               /* This part is just for double-checking */
+               itemid = PageGetItemId(page, nextoffset);
+               btitem = (BTItem) PageGetItem(page, itemid);
+               if (ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid)) != rightsib)
+                       elog(PANIC, "_bt_pagedel: right sibling is not next child");
+
+               PageIndexTupleDelete(page, nextoffset);
+       }
+
+       /*
+        * Update siblings' side-links.  Note the target page's side-links will
+        * continue to point to the siblings.
+        */
+       if (BufferIsValid(lbuf))
+       {
+               page = BufferGetPage(lbuf);
+               opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+               Assert(opaque->btpo_next == target);
+               opaque->btpo_next = rightsib;
+       }
+       page = BufferGetPage(rbuf);
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+       Assert(opaque->btpo_prev == target);
+       opaque->btpo_prev = leftsib;
+       rightsib_empty = (P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page));
+
+       /*
+        * Mark the page itself deleted.  It can be recycled when all current
+        * transactions are gone; or immediately if we're doing VACUUM FULL.
+        */
+       page = BufferGetPage(buf);
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+       opaque->btpo_flags |= BTP_DELETED;
+       opaque->btpo.xact =
+               vacuum_full ? FrozenTransactionId : ReadNewTransactionId();
+
+       /* And update the metapage, if needed */
+       if (BufferIsValid(metabuf))
+       {
+               metad->btm_fastroot = rightsib;
+               metad->btm_fastlevel = targetlevel;
+       }
+
+       /* XLOG stuff */
+       if (!rel->rd_istemp)
+       {
+               xl_btree_delete_page xlrec;
+               xl_btree_metadata xlmeta;
+               uint8           xlinfo;
+               XLogRecPtr      recptr;
+               XLogRecData rdata[5];
+               XLogRecData *nextrdata;
+
+               xlrec.target.node = rel->rd_node;
+               ItemPointerSet(&(xlrec.target.tid), parent, poffset);
+               xlrec.deadblk = target;
+               xlrec.leftblk = leftsib;
+               xlrec.rightblk = rightsib;
+
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].data = (char *) &xlrec;
+               rdata[0].len = SizeOfBtreeDeletePage;
+               rdata[0].next = nextrdata = &(rdata[1]);
+
+               if (BufferIsValid(metabuf))
+               {
+                       xlmeta.root = metad->btm_root;
+                       xlmeta.level = metad->btm_level;
+                       xlmeta.fastroot = metad->btm_fastroot;
+                       xlmeta.fastlevel = metad->btm_fastlevel;
+
+                       nextrdata->buffer = InvalidBuffer;
+                       nextrdata->data = (char *) &xlmeta;
+                       nextrdata->len = sizeof(xl_btree_metadata);
+                       nextrdata->next = nextrdata + 1;
+                       nextrdata++;
+                       xlinfo = XLOG_BTREE_DELETE_PAGE_META;
+               }
+               else
+                       xlinfo = XLOG_BTREE_DELETE_PAGE;
+
+               nextrdata->buffer = pbuf;
+               nextrdata->data = NULL;
+               nextrdata->len = 0;
+               nextrdata->next = nextrdata + 1;
+               nextrdata++;
+
+               nextrdata->buffer = rbuf;
+               nextrdata->data = NULL;
+               nextrdata->len = 0;
+               nextrdata->next = NULL;
+
+               if (BufferIsValid(lbuf))
+               {
+                       nextrdata->next = nextrdata + 1;
+                       nextrdata++;
+                       nextrdata->buffer = lbuf;
+                       nextrdata->data = NULL;
+                       nextrdata->len = 0;
+                       nextrdata->next = NULL;
+               }
+
+               recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+
+               if (BufferIsValid(metabuf))
+               {
+                       PageSetLSN(metapg, recptr);
+                       PageSetSUI(metapg, ThisStartUpID);
+               }
+               page = BufferGetPage(pbuf);
+               PageSetLSN(page, recptr);
+               PageSetSUI(page, ThisStartUpID);
+               page = BufferGetPage(rbuf);
+               PageSetLSN(page, recptr);
+               PageSetSUI(page, ThisStartUpID);
+               page = BufferGetPage(buf);
+               PageSetLSN(page, recptr);
+               PageSetSUI(page, ThisStartUpID);
+               if (BufferIsValid(lbuf))
+               {
+                       page = BufferGetPage(lbuf);
+                       PageSetLSN(page, recptr);
+                       PageSetSUI(page, ThisStartUpID);
+               }
+       }
+
+       END_CRIT_SECTION();
+
+       /* Write and release buffers */
+       if (BufferIsValid(metabuf))
+               _bt_wrtbuf(rel, metabuf);
+       _bt_wrtbuf(rel, pbuf);
+       _bt_wrtbuf(rel, rbuf);
+       _bt_wrtbuf(rel, buf);
+       if (BufferIsValid(lbuf))
+               _bt_wrtbuf(rel, lbuf);
+
+       /*
+        * If parent became half dead, recurse to try to delete it.  Otherwise,
+        * if right sibling is empty and is now the last child of the parent,
+        * recurse to try to delete it.  (These cases cannot apply at the same
+        * time, though the second case might itself recurse to the first.)
+        */
+       if (parent_half_dead)
+       {
+               buf = _bt_getbuf(rel, parent, BT_READ);
+               return _bt_pagedel(rel, buf, vacuum_full) + 1;
+       }
+       if (parent_one_child && rightsib_empty)
+       {
+               buf = _bt_getbuf(rel, rightsib, BT_READ);
+               return _bt_pagedel(rel, buf, vacuum_full) + 1;
+       }
+
+       return 1;
+}
index c7f23da4c7a8ab927cbfdab71493e7212fee34d4..b7a1e7ada1c532be2c6f0996a1b012d3c2afe5d4 100644 (file)
@@ -12,7 +12,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.96 2003/02/22 00:45:04 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.97 2003/02/23 06:17:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -710,15 +710,16 @@ Datum
 btvacuumcleanup(PG_FUNCTION_ARGS)
 {
        Relation        rel = (Relation) PG_GETARG_POINTER(0);
-#ifdef NOT_USED
        IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1);
-#endif
        IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2);
        BlockNumber num_pages;
        BlockNumber blkno;
        PageFreeSpaceInfo *pageSpaces;
        int                     nFreePages,
                                maxFreePages;
+       BlockNumber pages_deleted = 0;
+       MemoryContext mycontext;
+       MemoryContext oldcontext;
 
        Assert(stats != NULL);
 
@@ -731,6 +732,13 @@ btvacuumcleanup(PG_FUNCTION_ARGS)
        pageSpaces = (PageFreeSpaceInfo *) palloc(maxFreePages * sizeof(PageFreeSpaceInfo));
        nFreePages = 0;
 
+       /* Create a temporary memory context to run _bt_pagedel in */
+       mycontext = AllocSetContextCreate(CurrentMemoryContext,
+                                                                         "_bt_pagedel",
+                                                                         ALLOCSET_DEFAULT_MINSIZE,
+                                                                         ALLOCSET_DEFAULT_INITSIZE,
+                                                                         ALLOCSET_DEFAULT_MAXSIZE);
+
        /*
         * Scan through all pages of index, except metapage.  (Any pages added
         * after we start the scan will not be examined; this should be fine,
@@ -745,17 +753,53 @@ btvacuumcleanup(PG_FUNCTION_ARGS)
                buf = _bt_getbuf(rel, blkno, BT_READ);
                page = BufferGetPage(buf);
                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
-               if (P_ISDELETED(opaque))
+               if (_bt_page_recyclable(page))
                {
-                       /* XXX if safe-to-reclaim... */
+                       /* Okay to recycle this page */
                        if (nFreePages < maxFreePages)
                        {
                                pageSpaces[nFreePages].blkno = blkno;
-                               /* The avail-space value is bogus, but must be < BLCKSZ */
+                               /* claimed avail-space must be < BLCKSZ */
                                pageSpaces[nFreePages].avail = BLCKSZ-1;
                                nFreePages++;
                        }
                }
+               else if ((opaque->btpo_flags & BTP_HALF_DEAD) ||
+                                P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page))
+               {
+                       /* Empty, try to delete */
+                       int             ndel;
+
+                       /* Run pagedel in a temp context to avoid memory leakage */
+                       MemoryContextReset(mycontext);
+                       oldcontext = MemoryContextSwitchTo(mycontext);
+
+                       ndel = _bt_pagedel(rel, buf, info->vacuum_full);
+                       pages_deleted += ndel;
+
+                       /*
+                        * During VACUUM FULL it's okay to recycle deleted pages
+                        * immediately, since there can be no other transactions
+                        * scanning the index.  Note that we will only recycle the
+                        * current page and not any parent pages that _bt_pagedel
+                        * might have recursed to; this seems reasonable in the name
+                        * of simplicity.  (Trying to do otherwise would mean we'd
+                        * have to sort the list of recyclable pages we're building.)
+                        */
+                       if (ndel && info->vacuum_full)
+                       {
+                               if (nFreePages < maxFreePages)
+                               {
+                                       pageSpaces[nFreePages].blkno = blkno;
+                                       /* claimed avail-space must be < BLCKSZ */
+                                       pageSpaces[nFreePages].avail = BLCKSZ-1;
+                                       nFreePages++;
+                               }
+                       }
+
+                       MemoryContextSwitchTo(oldcontext);
+                       continue;                       /* pagedel released buffer */
+               }
                _bt_relbuf(rel, buf);
        }
 
@@ -768,6 +812,13 @@ btvacuumcleanup(PG_FUNCTION_ARGS)
 
        pfree(pageSpaces);
 
+       MemoryContextDelete(mycontext);
+
+       if (pages_deleted > 0)
+               elog(info->message_level, "Index %s: %u pages, deleted %u; %u now free",
+                        RelationGetRelationName(rel),
+                        num_pages, pages_deleted, nFreePages);
+
        /* update statistics */
        stats->num_pages = num_pages;
        stats->pages_free = nFreePages;
index 87a0aaaa7a4f5ba319abcc772bc6249c02a7c9b2..058b13b6a43fc630d773dd1d163328dae58b5b1e 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.1 2003/02/21 00:06:21 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.2 2003/02/23 06:17:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -403,6 +403,171 @@ btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
        UnlockAndWriteBuffer(buffer);
 }
 
+static void
+btree_xlog_delete_page(bool redo, bool ismeta,
+                                          XLogRecPtr lsn, XLogRecord *record)
+{
+       xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
+       Relation        reln;
+       BlockNumber     parent;
+       BlockNumber     target;
+       BlockNumber     leftsib;
+       BlockNumber     rightsib;
+       Buffer          buffer;
+       Page            page;
+       BTPageOpaque pageop;
+       char       *op = (redo) ? "redo" : "undo";
+
+       reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
+       if (!RelationIsValid(reln))
+               return;
+
+       parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+       target = xlrec->deadblk;
+       leftsib = xlrec->leftblk;
+       rightsib = xlrec->rightblk;
+
+       /* parent page */
+       if (redo && !(record->xl_info & XLR_BKP_BLOCK_1))
+       {
+               buffer = XLogReadBuffer(false, reln, parent);
+               if (!BufferIsValid(buffer))
+                       elog(PANIC, "btree_delete_page_redo: parent block unfound");
+               page = (Page) BufferGetPage(buffer);
+               pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+               if (PageIsNew((PageHeader) page))
+                       elog(PANIC, "btree_delete_page_redo: uninitialized parent page");
+               if (XLByteLE(lsn, PageGetLSN(page)))
+               {
+                       UnlockAndReleaseBuffer(buffer);
+               }
+               else
+               {
+                       OffsetNumber poffset;
+
+                       poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+                       if (poffset >= PageGetMaxOffsetNumber(page))
+                       {
+                               Assert(poffset == P_FIRSTDATAKEY(pageop));
+                               PageIndexTupleDelete(page, poffset);
+                               pageop->btpo_flags |= BTP_HALF_DEAD;
+                       }
+                       else
+                       {
+                               ItemId          itemid;
+                               BTItem          btitem;
+                               OffsetNumber    nextoffset;
+
+                               itemid = PageGetItemId(page, poffset);
+                               btitem = (BTItem) PageGetItem(page, itemid);
+                               ItemPointerSet(&(btitem->bti_itup.t_tid), rightsib, P_HIKEY);
+                               nextoffset = OffsetNumberNext(poffset);
+                               PageIndexTupleDelete(page, nextoffset);
+                       }
+
+                       PageSetLSN(page, lsn);
+                       PageSetSUI(page, ThisStartUpID);
+                       UnlockAndWriteBuffer(buffer);
+               }
+       }
+
+       /* Fix left-link of right sibling */
+       if (redo && !(record->xl_info & XLR_BKP_BLOCK_2))
+       {
+               buffer = XLogReadBuffer(false, reln, rightsib);
+               if (!BufferIsValid(buffer))
+                       elog(PANIC, "btree_delete_page_redo: lost right sibling");
+               page = (Page) BufferGetPage(buffer);
+               if (PageIsNew((PageHeader) page))
+                       elog(PANIC, "btree_delete_page_redo: uninitialized right sibling");
+               if (XLByteLE(lsn, PageGetLSN(page)))
+               {
+                       UnlockAndReleaseBuffer(buffer);
+               }
+               else
+               {
+                       pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+                       pageop->btpo_prev = leftsib;
+
+                       PageSetLSN(page, lsn);
+                       PageSetSUI(page, ThisStartUpID);
+                       UnlockAndWriteBuffer(buffer);
+               }
+       }
+
+       /* Fix right-link of left sibling, if any */
+       if (redo && !(record->xl_info & XLR_BKP_BLOCK_3))
+       {
+               if (leftsib != P_NONE)
+               {
+                       buffer = XLogReadBuffer(false, reln, leftsib);
+                       if (!BufferIsValid(buffer))
+                               elog(PANIC, "btree_delete_page_redo: lost left sibling");
+                       page = (Page) BufferGetPage(buffer);
+                       if (PageIsNew((PageHeader) page))
+                               elog(PANIC, "btree_delete_page_redo: uninitialized left sibling");
+                       if (XLByteLE(lsn, PageGetLSN(page)))
+                       {
+                               UnlockAndReleaseBuffer(buffer);
+                       }
+                       else
+                       {
+                               pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+                               pageop->btpo_next = rightsib;
+
+                               PageSetLSN(page, lsn);
+                               PageSetSUI(page, ThisStartUpID);
+                               UnlockAndWriteBuffer(buffer);
+                       }
+               }
+       }
+
+       /* Rewrite target page as empty deleted page */
+       buffer = XLogReadBuffer(false, reln, target);
+       if (!BufferIsValid(buffer))
+               elog(PANIC, "btree_delete_page_%s: lost target page", op);
+       page = (Page) BufferGetPage(buffer);
+       if (redo)
+               _bt_pageinit(page, BufferGetPageSize(buffer));
+       else if (PageIsNew((PageHeader) page))
+               elog(PANIC, "btree_delete_page_undo: uninitialized target page");
+       pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+       if (redo)
+       {
+               pageop->btpo_prev = leftsib;
+               pageop->btpo_next = rightsib;
+               pageop->btpo.xact = FrozenTransactionId;
+               pageop->btpo_flags = BTP_DELETED;
+
+               PageSetLSN(page, lsn);
+               PageSetSUI(page, ThisStartUpID);
+               UnlockAndWriteBuffer(buffer);
+       }
+       else
+       {
+               /* undo */
+               if (XLByteLT(PageGetLSN(page), lsn))
+                       elog(PANIC, "btree_delete_page_undo: bad left sibling LSN");
+               elog(PANIC, "btree_delete_page_undo: unimplemented");
+       }
+
+       /* Update metapage if needed */
+       if (redo)                                       /* metapage changes not undoable */
+       {
+               if (ismeta)
+               {
+                       xl_btree_metadata md;
+
+                       memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
+                                  sizeof(xl_btree_metadata));
+                       _bt_restore_meta(reln, lsn,
+                                                        md.root, md.level,
+                                                        md.fastroot, md.fastlevel);
+               }
+       }
+}
+
 static void
 btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
 {
@@ -534,8 +699,10 @@ btree_redo(XLogRecPtr lsn, XLogRecord *record)
                        btree_xlog_delete(true, lsn, record);
                        break;
                case XLOG_BTREE_DELETE_PAGE:
+                       btree_xlog_delete_page(true, false, lsn, record);
+                       break;
                case XLOG_BTREE_DELETE_PAGE_META:
-                       // ???
+                       btree_xlog_delete_page(true, true, lsn, record);
                        break;
                case XLOG_BTREE_NEWROOT:
                        btree_xlog_newroot(true, lsn, record);
@@ -583,8 +750,10 @@ btree_undo(XLogRecPtr lsn, XLogRecord *record)
                        btree_xlog_delete(false, lsn, record);
                        break;
                case XLOG_BTREE_DELETE_PAGE:
+                       btree_xlog_delete_page(false, false, lsn, record);
+                       break;
                case XLOG_BTREE_DELETE_PAGE_META:
-                       // ???
+                       btree_xlog_delete_page(false, true, lsn, record);
                        break;
                case XLOG_BTREE_NEWROOT:
                        btree_xlog_newroot(false, lsn, record);
index b8fe0a3bf3ed1e51d03e811ea149f5b23e4cdc55..4b6d92eb35a4d87d0b7eafc9df6a39f05aded7d1 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/freespace/freespace.c,v 1.14 2002/09/20 19:56:01 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/freespace/freespace.c,v 1.15 2003/02/23 06:17:13 tgl Exp $
  *
  *
  * NOTES:
@@ -681,7 +681,9 @@ free_chunk_chain(FSMChunk *fchunk)
  * Look to see if a page with at least the specified amount of space is
  * available in the given FSMRelation. If so, return its page number,
  * and advance the nextPage counter so that the next inquiry will return
- * a different page if possible.  Return InvalidBlockNumber if no success.
+ * a different page if possible; also update the entry to show that the
+ * requested space is not available anymore.  Return InvalidBlockNumber
+ * if no success.
  */
 static BlockNumber
 find_free_space(FSMRelation *fsmrel, Size spaceNeeded)
@@ -713,6 +715,12 @@ find_free_space(FSMRelation *fsmrel, Size spaceNeeded)
                /* Check the next page */
                if ((Size) curChunk->bytes[chunkRelIndex] >= spaceNeeded)
                {
+                       /*
+                        * Found what we want --- adjust the entry.  In theory we could
+                        * delete the entry immediately if it drops below threshold,
+                        * but it seems better to wait till we next need space.
+                        */
+                       curChunk->bytes[chunkRelIndex] -= (ItemLength) spaceNeeded;
                        fsmrel->nextPage = pageIndex + 1;
                        return curChunk->pages[chunkRelIndex];
                }
index 2978c398cd6a4c2b9d8f0955e2b9c8d9ec1ba2c9..1cac66a56e98eb2ad7535ed729fe3e1a2893cc46 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: itup.h,v 1.36 2002/08/25 17:20:01 tgl Exp $
+ * $Id: itup.h,v 1.37 2003/02/23 06:17:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -133,11 +133,11 @@ typedef InsertIndexResultData *InsertIndexResult;
 )
 
 
-/* indextuple.h */
+/* routines in indextuple.c */
 extern IndexTuple index_formtuple(TupleDesc tupleDescriptor,
                                Datum *value, char *null);
 extern Datum nocache_index_getattr(IndexTuple tup, int attnum,
                                          TupleDesc tupleDesc, bool *isnull);
-extern void CopyIndexTuple(IndexTuple source, IndexTuple *target);
+extern IndexTuple CopyIndexTuple(IndexTuple source);
 
 #endif   /* ITUP_H */
index 4bb5db0513e872e7fbbd1071b18d5a33ee8e0d7d..35b6c94e7e247d57272c997881bad023472a5458 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: nbtree.h,v 1.65 2003/02/22 00:45:05 tgl Exp $
+ * $Id: nbtree.h,v 1.66 2003/02/23 06:17:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -118,6 +118,8 @@ typedef struct BTItemData
 
 typedef BTItemData *BTItem;
 
+#define CopyBTItem(btitem) ((BTItem) CopyIndexTuple((IndexTuple) (btitem)))
+
 /*
  * For XLOG: size without alignment. Sizeof works as long as
  * IndexTupleData has exactly 8 bytes.
@@ -434,6 +436,7 @@ extern Datum btvacuumcleanup(PG_FUNCTION_ARGS);
  */
 extern InsertIndexResult _bt_doinsert(Relation rel, BTItem btitem,
                         bool index_is_unique, Relation heapRel);
+extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
 extern void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
                                                          BTStack stack, bool is_root, bool is_only);
 
@@ -448,8 +451,10 @@ extern void _bt_relbuf(Relation rel, Buffer buf);
 extern void _bt_wrtbuf(Relation rel, Buffer buf);
 extern void _bt_wrtnorelbuf(Relation rel, Buffer buf);
 extern void _bt_pageinit(Page page, Size size);
+extern bool _bt_page_recyclable(Page page);
 extern void _bt_metaproot(Relation rel, BlockNumber rootbknum, uint32 level);
 extern void _bt_itemdel(Relation rel, Buffer buf, ItemPointer tid);
+extern int     _bt_pagedel(Relation rel, Buffer buf, bool vacuum_full);
 
 /*
  * prototypes for functions in nbtsearch.c
@@ -488,7 +493,6 @@ extern BTItem _bt_formitem(IndexTuple itup);
 /*
  * prototypes for functions in nbtsort.c
  */
-
 typedef struct BTSpool BTSpool; /* opaque type known only within nbtsort.c */
 
 extern BTSpool *_bt_spoolinit(Relation index, bool isunique);