]> granicus.if.org Git - postgresql/commitdiff
Runtime tree recovery is implemented, just testing is left -:)
authorVadim B. Mikheev <vadim4o@yahoo.com>
Fri, 2 Feb 2001 19:49:15 +0000 (19:49 +0000)
committerVadim B. Mikheev <vadim4o@yahoo.com>
Fri, 2 Feb 2001 19:49:15 +0000 (19:49 +0000)
src/backend/access/nbtree/nbtinsert.c

index b44b7839dea98c05ee2065e6fbab756e6a6aa749..6a4b925cd150522b88181a8fad78f78d68b8d041 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.79 2001/01/31 01:08:36 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.80 2001/02/02 19:49:15 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -38,7 +38,10 @@ extern bool FixBTree;
 
 Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
 static void _bt_fixtree(Relation rel, BlockNumber blkno);
-static BlockNumber _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
+static void _bt_fixbranch(Relation rel, BlockNumber lblkno, 
+                               BlockNumber rblkno, BTStack true_stack);
+static void _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
+static void _bt_fixup(Relation rel, Buffer buf);
 static OffsetNumber _bt_getoff(Page page, BlockNumber blkno);
 
 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
@@ -64,7 +67,7 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
                                                          int leftfree, int rightfree,
                                                          bool newitemonleft, Size firstrightitemsz);
-static Buffer _bt_getstackbuf(Relation rel, BTStack stack);
+static Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
 static void _bt_pgaddtup(Relation rel, Page page,
                                                 Size itemsize, BTItem btitem,
                                                 OffsetNumber itup_off, const char *where);
@@ -497,34 +500,7 @@ _bt_insertonpg(Relation rel,
                                                elog(ERROR, "bt_insertonpg: no root page found");
                                        _bt_wrtbuf(rel, rbuf);
                                        _bt_wrtnorelbuf(rel, buf);
-                                       while(! P_LEFTMOST(lpageop))
-                                       {
-                                               BlockNumber             blkno = lpageop->btpo_prev;
-                                               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-                                               ReleaseBuffer(buf);
-                                               buf = _bt_getbuf(rel, blkno, BT_WRITE);
-                                               page = BufferGetPage(buf);
-                                               lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
-                                               /*
-                                                * If someone else already created parent pages
-                                                * then it's time for _bt_fixtree() to check upper
-                                                * levels and fix them, if required.
-                                                */
-                                               if (lpageop->btpo_parent != BTREE_METAPAGE)
-                                               {
-                                                       blkno = lpageop->btpo_parent;
-                                                       _bt_relbuf(rel, buf, BT_WRITE);
-                                                       _bt_fixtree(rel, blkno);
-                                                       goto formres;
-                                               }
-                                       }
-                                       /*
-                                        * Ok, we are on the leftmost page, it's write locked
-                                        * by us and its btpo_parent points to meta page - time
-                                        * for _bt_fixroot().
-                                        */
-                                        buf = _bt_fixroot(rel, buf, true);
-                                        _bt_relbuf(rel, buf, BT_WRITE);
+                                       _bt_fixup(rel, buf);
                                        goto formres;
                                }
 
@@ -561,16 +537,22 @@ _bt_insertonpg(Relation rel,
                        ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
                                                   bknum, P_HIKEY);
 
-                       pbuf = _bt_getstackbuf(rel, stack);
-
-                       if (pbuf == InvalidBuffer)
-                               elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
-                                        "\n\tRecreate index %s.", RelationGetRelationName(rel));
+                       pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
 
                        /* Now we can write and unlock the children */
                        _bt_wrtbuf(rel, rbuf);
                        _bt_wrtbuf(rel, buf);
 
+                       if (pbuf == InvalidBuffer)
+                       {
+                               if (!FixBTree)
+                                       elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
+                                                "\n\tRecreate index %s.", RelationGetRelationName(rel));
+                               pfree(new_item);
+                               _bt_fixbranch(rel, bknum, rbknum, stack);
+                               goto formres;
+                       }
+
                        /* Recursively update the parent */
                        newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
                                                                        0, NULL, new_item, stack->bts_offset);
@@ -1132,7 +1114,7 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
  *             Also, re-set bts_blkno & bts_offset if changed.
  */
 static Buffer
-_bt_getstackbuf(Relation rel, BTStack stack)
+_bt_getstackbuf(Relation rel, BTStack stack, int access)
 {
        BlockNumber blkno;
        Buffer          buf;
@@ -1145,7 +1127,7 @@ _bt_getstackbuf(Relation rel, BTStack stack)
        BTPageOpaque opaque;
 
        blkno = stack->bts_blkno;
-       buf = _bt_getbuf(rel, blkno, BT_WRITE);
+       buf = _bt_getbuf(rel, blkno, access);
        page = BufferGetPage(buf);
        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
        maxoff = PageGetMaxOffsetNumber(page);
@@ -1179,13 +1161,13 @@ _bt_getstackbuf(Relation rel, BTStack stack)
                /* by here, the item we're looking for moved right at least one page */
                if (P_RIGHTMOST(opaque))
                {
-                       _bt_relbuf(rel, buf, BT_WRITE);
+                       _bt_relbuf(rel, buf, access);
                        return(InvalidBuffer);
                }
 
                blkno = opaque->btpo_next;
-               _bt_relbuf(rel, buf, BT_WRITE);
-               buf = _bt_getbuf(rel, blkno, BT_WRITE);
+               _bt_relbuf(rel, buf, access);
+               buf = _bt_getbuf(rel, blkno, access);
                page = BufferGetPage(buf);
                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                maxoff = PageGetMaxOffsetNumber(page);
@@ -1400,10 +1382,13 @@ _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release)
 
        /*
         * Now read other pages (if any) on level and add them to new root.
+        * Here we break one of our locking rules - never hold lock on parent
+        * page when acquiring lock on its child, - but we free from deadlock:
+        *
         * If concurrent process will split one of pages on this level then it
-        * will notice either btpo_parent == metablock or btpo_parent == rootblk.
-        * In first case it will give up its locks and try to lock leftmost page
-        * buffer (oldrootbuf) to fix root - ie it will wait for us and let us
+        * will see either btpo_parent == metablock or btpo_parent == rootblk.
+        * In first case it will give up its locks and walk to the leftmost page
+        * (oldrootbuf) in _bt_fixup() - ie it will wait for us and let us
         * continue. In second case it will try to lock rootbuf keeping its locks
         * on buffers we already passed, also waiting for us. If we'll have to
         * unlock rootbuf (split it) and that process will have to split page
@@ -1547,14 +1532,12 @@ _bt_fixtree(Relation rel, BlockNumber blkno)
  * Check/fix level starting from page in buffer buf up to block
  * limit on *child* level (or till rightmost child page if limit
  * is InvalidBlockNumber). Start buffer must be read locked.
- * No pins/locks are held on exit. Returns block number of last
- * visited/pointing-to-limit page on *check/fix* level.
+ * No pins/locks are held on exit.
  */
-static BlockNumber
+static void
 _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
 {
        BlockNumber             blkno = BufferGetBlockNumber(buf);
-       BlockNumber             pblkno = blkno;
        Page                    page;
        BTPageOpaque    opaque;
        BlockNumber             cblkno[3];
@@ -1639,22 +1622,20 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
                        BTStackData             stack;
 
                        stack.bts_parent = NULL;
-                       stack.bts_blkno = pblkno;
+                       stack.bts_blkno = blkno;
                        stack.bts_offset = InvalidOffsetNumber;
                        ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid),
                                                        cblkno[0], P_HIKEY);
 
-                       buf = _bt_getstackbuf(rel, &stack);
+                       buf = _bt_getstackbuf(rel, &stack, BT_WRITE);
                        if (buf == InvalidBuffer)
                                elog(ERROR, "bt_fixlevel: pointer disappeared (need to recreate index)");
 
                        page = BufferGetPage(buf);
                        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                        coff[0] = stack.bts_offset;
-                       pblkno = BufferGetBlockNumber(buf);
-                       parblk[0] = pblkno;
-                       if (cblkno[0] == limit)
-                               blkno = pblkno;         /* where we have seen pointer to limit */
+                       blkno = BufferGetBlockNumber(buf);
+                       parblk[0] = blkno;
 
                        /* Check/insert missed pointers */
                        for (i = 1; i <= cidx; i++)
@@ -1668,8 +1649,6 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
                                        if (parblk[i] == parblk[i - 1] &&
                                                                coff[i] != coff[i - 1] + 1)
                                                elog(ERROR, "bt_fixlevel: invalid item order(2) (need to recreate index)");
-                                       if (cblkno[i] == limit)
-                                               blkno = parblk[i];
                                        continue;
                                }
                                /* Have to check next page ? */
@@ -1688,10 +1667,8 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
                                                buf = newbuf;
                                                page = newpage;
                                                opaque = newopaque;
-                                               pblkno = BufferGetBlockNumber(buf);
-                                               parblk[i] = pblkno;
-                                               if (cblkno[i] == limit)
-                                                       blkno = pblkno;
+                                               blkno = BufferGetBlockNumber(buf);
+                                               parblk[i] = blkno;
                                                continue;
                                        }
                                        /* unfound - need to insert on current page */
@@ -1730,7 +1707,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
                                                page = BufferGetPage(buf);
                                                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
                                        }
-                                       pblkno = BufferGetBlockNumber(buf);
+                                       blkno = BufferGetBlockNumber(buf);
                                        coff[i] = itup_off;
                                }
                                else
@@ -1740,9 +1717,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
                                }
 
                                pfree(btitem);
-                               parblk[i] = pblkno;
-                               if (cblkno[i] == limit)
-                                       blkno = pblkno;
+                               parblk[i] = blkno;
                        }
 
                        /* copy page with pointer to cblkno[cidx] to temp storage */
@@ -1750,8 +1725,6 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
                        _bt_relbuf(rel, buf, BT_WRITE);
                        page = (Page)tbuf;
                        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
-                       if (limit == InvalidBlockNumber)
-                               blkno = pblkno;         /* last visited page */
                }
 
                /* Continue if current check/fix level page is rightmost */
@@ -1766,7 +1739,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
                {
                        if (cidx == 2)
                                _bt_relbuf(rel, cbuf[2], BT_READ);
-                       return(blkno);
+                       return;
                }
                if (cblkno[0] == limit || cblkno[1] == limit)
                        goodbye = true;
@@ -1778,6 +1751,168 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
        }
 }
 
+/*
+ * Check/fix part of tree - branch - up from parent of level with blocks
+ * lblkno and rblknum. We first ensure that parent level has pointers
+ * to both lblkno & rblknum and if those pointers are on different
+ * parent pages then do the same for parent level, etc. No locks must
+ * be held on target level and upper on entry. No locks will be held
+ * on exit. Stack created when traversing tree down should be provided and
+ * it must points to parent level. rblkno must be on the right from lblkno.
+ * (This function is special edition of more expensive _bt_fixtree(),
+ * but it doesn't guarantee full consistency of tree.)
+ */
+static void
+_bt_fixbranch(Relation rel, BlockNumber lblkno, 
+                               BlockNumber rblkno, BTStack true_stack)
+{
+       BlockNumber             blkno = true_stack->bts_blkno;
+       BTStackData             stack;
+       BTPageOpaque    opaque;
+       Buffer                  buf, rbuf;
+       Page                    page;
+       OffsetNumber    offnum;
+
+       true_stack = true_stack->bts_parent;
+       for ( ; ; )
+       {
+               buf = _bt_getbuf(rel, blkno, BT_READ);
+
+               /* Check/fix parent level pointed by blkno */
+               _bt_fixlevel(rel, buf, rblkno);
+
+               /*
+                * Here parent level should have pointers for both
+                * lblkno and rblkno and we have to find them.
+                */
+               stack.bts_parent = NULL;
+               stack.bts_blkno = blkno;
+               stack.bts_offset = InvalidOffsetNumber;
+               ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), lblkno, P_HIKEY);
+               buf = _bt_getstackbuf(rel, &stack, BT_READ);
+               if (buf == InvalidBuffer)
+                       elog(ERROR, "bt_fixbranch: left pointer unfound (need to recreate index)");
+               page = BufferGetPage(buf);
+               offnum = _bt_getoff(page, rblkno);
+
+               if (offnum != InvalidOffsetNumber)      /* right pointer found */
+               {
+                       if (offnum <= stack.bts_offset)
+                               elog(ERROR, "bt_fixbranch: invalid item order (need to recreate index)");
+                       _bt_relbuf(rel, buf, BT_READ);
+                       return;
+               }
+
+               /* Pointers are on different parent pages - find right one */
+               lblkno = BufferGetBlockNumber(buf);
+
+               stack.bts_parent = NULL;
+               stack.bts_blkno = lblkno;
+               stack.bts_offset = InvalidOffsetNumber;
+               ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), rblkno, P_HIKEY);
+               rbuf = _bt_getstackbuf(rel, &stack, BT_READ);
+               if (rbuf == InvalidBuffer)
+                       elog(ERROR, "bt_fixbranch: right pointer unfound (need to recreate index)");
+               rblkno = BufferGetBlockNumber(rbuf);
+               _bt_relbuf(rel, rbuf, BT_READ);
+
+               /*
+                * If we have parent item in true_stack then go up one level and
+                * ensure that it has pointers to new lblkno & rblkno.
+                */
+               if (true_stack)
+               {
+                       _bt_relbuf(rel, buf, BT_READ);
+                       blkno = true_stack->bts_blkno;
+                       true_stack = true_stack->bts_parent;
+                       continue;
+               }
+
+               /*
+                * Well, we are on the level that was root or unexistent when
+                * we started traversing tree down. If btpo_parent is updated
+                * then we'll use it to continue, else we'll fix/restore upper
+                * levels entirely.
+                */
+               opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+               if (opaque->btpo_parent != BTREE_METAPAGE)
+               {
+                       blkno = opaque->btpo_parent;
+                       _bt_relbuf(rel, buf, BT_READ);
+                       continue;
+               }
+
+               /* Have to switch to excl buf lock and re-check btpo_parent */
+               _bt_relbuf(rel, buf, BT_READ);
+               buf = _bt_getbuf(rel, blkno, BT_WRITE);
+               page = BufferGetPage(buf);
+               opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+               if (opaque->btpo_parent != BTREE_METAPAGE)
+               {
+                       blkno = opaque->btpo_parent;
+                       _bt_relbuf(rel, buf, BT_WRITE);
+                       continue;
+               }
+
+               /*
+                * We hold excl lock on some internal page with unupdated
+                * btpo_parent - time for _bt_fixup.
+                */
+               break;
+       }
+
+       _bt_fixup(rel, buf);
+
+       return;
+}
+
+/*
+ * Having buf excl locked this routine walks to the left on level and
+ * uses either _bt_fixtree() or _bt_fixroot() to create/check&fix upper
+ * levels. No buffer pins/locks will be held on exit.
+ */
+static void
+_bt_fixup(Relation rel, Buffer buf)
+{
+       Page                    page;
+       BTPageOpaque    opaque;
+       BlockNumber             blkno;
+
+       for ( ; ; )
+       {
+               page = BufferGetPage(buf);
+               opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+               /*
+                * If someone else already created parent pages
+                * then it's time for _bt_fixtree() to check upper
+                * levels and fix them, if required.
+                */
+               if (opaque->btpo_parent != BTREE_METAPAGE)
+               {
+                       blkno = opaque->btpo_parent;
+                       _bt_relbuf(rel, buf, BT_WRITE);
+                       _bt_fixtree(rel, blkno);
+                       return;
+               }
+               if (P_LEFTMOST(opaque))
+                       break;
+               blkno = opaque->btpo_prev;
+               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(buf);
+               buf = _bt_getbuf(rel, blkno, BT_WRITE);
+       }
+
+       /*
+        * Ok, we are on the leftmost page, it's write locked
+        * by us and its btpo_parent points to meta page - time
+        * for _bt_fixroot().
+        */
+        buf = _bt_fixroot(rel, buf, true);
+        _bt_relbuf(rel, buf, BT_WRITE);
+
+       return;
+}
+
 static OffsetNumber
 _bt_getoff(Page page, BlockNumber blkno)
 {