]> granicus.if.org Git - postgresql/commitdiff
First step in attempt to fix tree at runtime: create upper levels
authorVadim B. Mikheev <vadim4o@yahoo.com>
Fri, 26 Jan 2001 01:24:31 +0000 (01:24 +0000)
committerVadim B. Mikheev <vadim4o@yahoo.com>
Fri, 26 Jan 2001 01:24:31 +0000 (01:24 +0000)
and new root page if old root one was splitted but new root page
wasn't created.
New code is protected by FixBTree bool flag setted to FALSE, so
nothing should be affected by this untested approach.

src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtpage.c
src/backend/access/nbtree/nbtree.c

index dc17ceab11102362b0f0f54eb892a9a9e692be05..8f23e16992a5013a5baf2c24980b2ebf8740bb2b 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.76 2001/01/24 19:42:48 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.77 2001/01/26 01:24:31 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -34,7 +34,9 @@ typedef struct
        int             best_delta;                     /* best size delta so far */
 } FindSplitData;
 
-void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
+Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
+
+static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
 
 static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
                                                                          Relation heapRel, Buffer buf,
@@ -44,6 +46,8 @@ static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
                                                                                int keysz, ScanKey scankey,
                                                                                BTItem btitem,
                                                                                OffsetNumber afteritem);
+static void _bt_insertuple(Relation rel, Buffer buf, 
+                                               Size itemsz, BTItem btitem, OffsetNumber newitemoff);
 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                                                OffsetNumber newitemoff, Size newitemsz,
                                                BTItem newitem, bool newitemonleft,
@@ -456,9 +460,14 @@ _bt_insertonpg(Relation rel,
 
                if (is_root)
                {
+                       Buffer  rootbuf;
+
                        Assert(stack == (BTStack) NULL);
                        /* create a new root node and release the split buffers */
-                       _bt_newroot(rel, buf, rbuf);
+                       rootbuf = _bt_newroot(rel, buf, rbuf);
+                       _bt_wrtbuf(rel, rootbuf);
+                       _bt_wrtbuf(rel, rbuf);
+                       _bt_wrtbuf(rel, buf);
                }
                else
                {
@@ -519,52 +528,11 @@ _bt_insertonpg(Relation rel,
        }
        else
        {
-               START_CRIT_SECTION();
-               _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
                itup_off = newitemoff;
                itup_blkno = BufferGetBlockNumber(buf);
-               /* XLOG stuff */
-               {
-                       xl_btree_insert         xlrec;
-                       uint8                           flag = XLOG_BTREE_INSERT;
-                       XLogRecPtr                      recptr;
-                       XLogRecData                     rdata[2];
-                       BTItemData                      truncitem;
 
-                       xlrec.target.node = rel->rd_node;
-                       ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff);
-                       rdata[0].buffer = InvalidBuffer;
-                       rdata[0].data = (char*)&xlrec;
-                       rdata[0].len = SizeOfBtreeInsert;
-                       rdata[0].next = &(rdata[1]);
-
-                       /* Read comments in _bt_pgaddtup */
-                       if (!(P_ISLEAF(lpageop)) && newitemoff == P_FIRSTDATAKEY(lpageop))
-                       {
-                               truncitem = *btitem;
-                               truncitem.bti_itup.t_info = sizeof(BTItemData);
-                               rdata[1].data = (char*)&truncitem;
-                               rdata[1].len = sizeof(BTItemData);
-                       }
-                       else
-                       {
-                               rdata[1].data = (char*)btitem;
-                               rdata[1].len = IndexTupleDSize(btitem->bti_itup) + 
-                                                       (sizeof(BTItemData) - sizeof(IndexTupleData));
-                       }
-                       rdata[1].buffer = buf;
-                       rdata[1].next = NULL;
+               _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
 
-                       if (P_ISLEAF(lpageop))
-                               flag |= XLOG_BTREE_LEAF;
-
-                       recptr = XLogInsert(RM_BTREE_ID, flag, rdata);
-
-                       PageSetLSN(page, recptr);
-                       PageSetSUI(page, ThisStartUpID);
-               }
-
-               END_CRIT_SECTION();
                /* Write out the updated page and release pin/lock */
                _bt_wrtbuf(rel, buf);
        }
@@ -576,6 +544,57 @@ _bt_insertonpg(Relation rel,
        return res;
 }
 
+static void
+_bt_insertuple(Relation rel, Buffer buf, 
+                               Size itemsz, BTItem btitem, OffsetNumber newitemoff)
+{
+       Page                    page = BufferGetPage(buf);
+       BTPageOpaque    pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+       START_CRIT_SECTION();
+       _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
+       /* XLOG stuff */
+       {
+               xl_btree_insert         xlrec;
+               uint8                           flag = XLOG_BTREE_INSERT;
+               XLogRecPtr                      recptr;
+               XLogRecData                     rdata[2];
+               BTItemData                      truncitem;
+                       xlrec.target.node = rel->rd_node;
+               ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff);
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].data = (char*)&xlrec;
+               rdata[0].len = SizeOfBtreeInsert;
+               rdata[0].next = &(rdata[1]);
+
+               /* Read comments in _bt_pgaddtup */
+               if (!(P_ISLEAF(pageop)) && newitemoff == P_FIRSTDATAKEY(pageop))
+               {
+                       truncitem = *btitem;
+                       truncitem.bti_itup.t_info = sizeof(BTItemData);
+                       rdata[1].data = (char*)&truncitem;
+                       rdata[1].len = sizeof(BTItemData);
+               }
+               else
+               {
+                       rdata[1].data = (char*)btitem;
+                       rdata[1].len = IndexTupleDSize(btitem->bti_itup) + 
+                                               (sizeof(BTItemData) - sizeof(IndexTupleData));
+               }
+               rdata[1].buffer = buf;
+               rdata[1].next = NULL;
+               if (P_ISLEAF(pageop))
+                       flag |= XLOG_BTREE_LEAF;
+
+               recptr = XLogInsert(RM_BTREE_ID, flag, rdata);
+
+               PageSetLSN(page, recptr);
+               PageSetSUI(page, ThisStartUpID);
+       }
+
+       END_CRIT_SECTION();
+}
+
 /*
  *     _bt_split() -- split a page in the btree.
  *
@@ -1130,11 +1149,12 @@ _bt_getstackbuf(Relation rel, BTStack stack)
  *             graph.
  *
  *             On entry, lbuf (the old root) and rbuf (its new peer) are write-
- *             locked.  On exit, a new root page exists with entries for the
- *             two new children.  The new root page is neither pinned nor locked, and
- *             we have also written out lbuf and rbuf and dropped their pins/locks.
+ *             locked. On exit, a new root page exists with entries for the
+ *             two new children, metapage is updated and unlocked/unpinned. 
+ *      The new root buffer is returned to caller which has to unlock/unpin
+ *      lbuf, rbuf & rootbuf.
  */
-void
+static Buffer
 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
 {
        Buffer                  rootbuf;
@@ -1257,13 +1277,156 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
        }
        END_CRIT_SECTION();
 
-       /* write and let go of the new root buffer */
-       _bt_wrtbuf(rel, rootbuf);
+       /* write and let go of metapage buffer */
        _bt_wrtbuf(rel, metabuf);
 
-       /* update and release new sibling, and finally the old root */
-       _bt_wrtbuf(rel, rbuf);
-       _bt_wrtbuf(rel, lbuf);
+       return(rootbuf);
+}
+
+/*
+ * In the event old root page was splitted but no new one was created we
+ * build required parent levels keeping write lock on old root page.
+ * Note: it's assumed that old root page' btpo_parent points to meta page,
+ * ie not to parent page. On exit, new root page buffer is write locked.
+ * If "release" is TRUE then oldrootbuf will be released immediately
+ * after upper level is builded.
+ */
+Buffer
+_bt_fixroot(Relation rel, Buffer oldrootbuf, bool release)
+{
+       Buffer                  rootbuf;
+       BlockNumber             rootblk;
+       Page                    rootpage;
+       XLogRecPtr              rootLSN;
+       Page                    oldrootpage = BufferGetPage(oldrootbuf);
+       BTPageOpaque    oldrootopaque = (BTPageOpaque)
+                                               PageGetSpecialPointer(oldrootpage);
+       Buffer                  buf, leftbuf, rightbuf;
+       Page                    page, leftpage, rightpage;
+       BTPageOpaque    opaque, leftopaque, rightopaque;
+       OffsetNumber    newitemoff;
+       BTItem                  btitem, ritem;
+       Size                    itemsz;
+
+       if (! P_LEFTMOST(oldrootopaque) || P_RIGHTMOST(oldrootopaque))
+               elog(ERROR, "bt_fixroot: not valid old root page");
+
+       /* Read right neighbor and create new root page*/
+       leftbuf = _bt_getbuf(rel, oldrootopaque->btpo_next, BT_WRITE);
+       leftpage = BufferGetPage(leftbuf);
+       leftopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
+       rootbuf = _bt_newroot(rel, oldrootbuf, leftbuf);
+       rootpage = BufferGetPage(rootbuf);
+       rootLSN = PageGetLSN(rootpage);
+       rootblk = BufferGetBlockNumber(rootbuf);
+
+       /*
+        * Update LSN & StartUpID of old root buffer and its neighbor to
+        * ensure that they will be written on disk after logging new
+        * root creation. Unfortunately, for the moment (?) we do not
+        * log this operation and so possibly break our rule to log entire
+        * page content of first after checkpoint modification.
+        */
+       HOLD_INTERRUPTS();
+       oldrootopaque->btpo_parent = rootblk;
+       leftopaque->btpo_parent = rootblk;
+       PageSetLSN(oldrootpage, rootLSN);
+       PageSetSUI(oldrootpage, ThisStartUpID);
+       PageSetLSN(leftpage, rootLSN);
+       PageSetSUI(leftpage, ThisStartUpID);
+       RESUME_INTERRUPTS();
+
+       /* parent page where to insert pointers */
+       buf = rootbuf;
+       page = BufferGetPage(buf);
+       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+       /*
+        * Now read other pages (if any) on level and add them to new root.
+        * If concurrent process will split one of pages on this level then it
+        * will notice either btpo_parent == metablock or btpo_parent == rootblk.
+        * In first case it will give up its locks and try to lock leftmost page
+        * buffer (oldrootbuf) to fix root - ie it will wait for us and let us
+        * continue. In second case it will try to lock rootbuf keeping its locks
+        * on buffers we already passed, also waiting for us. If we'll have to
+        * unlock rootbuf (split it) and that process will have to split page
+        * of new level we created (level of rootbuf) then it will wait while
+        * we create upper level. Etc.
+        */
+       while(! P_RIGHTMOST(leftopaque))
+       {
+               rightbuf = _bt_getbuf(rel, leftopaque->btpo_next, BT_WRITE);
+               rightpage = BufferGetPage(rightbuf);
+               rightopaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
+
+               /* Update LSN & StartUpID (see comments above) */
+               HOLD_INTERRUPTS();
+               rightopaque->btpo_parent = rootblk;
+               if (XLByteLT(PageGetLSN(rightpage), rootLSN))
+                       PageSetLSN(rightpage, rootLSN);
+               PageSetSUI(rightpage, ThisStartUpID);
+               RESUME_INTERRUPTS();
+
+               ritem = (BTItem) PageGetItem(leftpage, PageGetItemId(leftpage, P_HIKEY));
+               btitem = _bt_formitem(&(ritem->bti_itup));
+               ItemPointerSet(&(btitem->bti_itup.t_tid), leftopaque->btpo_next, P_HIKEY);
+               itemsz = IndexTupleDSize(btitem->bti_itup)
+                       + (sizeof(BTItemData) - sizeof(IndexTupleData));
+               itemsz = MAXALIGN(itemsz);
+
+               newitemoff = OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+               if (PageGetFreeSpace(page) < itemsz)
+               {
+                       Buffer                  newbuf;
+                       OffsetNumber    firstright;
+                       OffsetNumber    itup_off;
+                       BlockNumber             itup_blkno;
+                       bool                    newitemonleft;
+
+                       firstright = _bt_findsplitloc(rel, page,
+                                                       newitemoff, itemsz, &newitemonleft);
+                       newbuf = _bt_split(rel, buf, firstright,
+                                               newitemoff, itemsz, btitem, newitemonleft,
+                                               &itup_off, &itup_blkno);
+                       /* Keep lock on new "root" buffer ! */
+                       if (buf != rootbuf)
+                               _bt_relbuf(rel, buf, BT_WRITE);
+                       buf = newbuf;
+                       page = BufferGetPage(buf);
+                       opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+               }
+               else
+                       _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
+
+               /* give up left buffer */
+               _bt_relbuf(rel, leftbuf, BT_WRITE);
+               leftbuf = rightbuf;
+               leftpage = rightpage;
+               leftopaque = rightopaque;
+       }
+
+       /* give up rightmost page buffer */
+       _bt_relbuf(rel, leftbuf, BT_WRITE);
+
+       /*
+        * Here we hold locks on old root buffer, new root buffer we've
+        * created with _bt_newroot() - rootbuf, - and buf we've used
+        * for last insert ops - buf. If rootbuf != buf then we have to
+        * create at least one more level. And if "release" is TRUE
+        * (ie we've already created some levels) then we give up
+        * oldrootbuf.
+        */
+       if (release)
+               _bt_relbuf(rel, oldrootbuf, BT_WRITE);
+
+       if (rootbuf != buf)
+       {
+               _bt_relbuf(rel, buf, BT_WRITE);
+               return(_bt_fixroot(rel, rootbuf, true));
+       }
+
+       return(rootbuf);
 }
 
 /*
index ad9d69e13d336f41abf43795af65b330aa3dfd66..0f68a066dc711fcfd543c0cff40b5f2ea3d5722b 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.47 2001/01/24 19:42:48 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.48 2001/01/26 01:24:31 vadim Exp $
  *
  *     NOTES
  *        Postgres btree pages look like ordinary relation pages.      The opaque
@@ -28,6 +28,8 @@
 #include "miscadmin.h"
 #include "storage/lmgr.h"
 
+extern bool FixBTree;  /* comments in nbtree.c */
+extern Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
 
 /*
  *     We use high-concurrency locking on btrees.      There are two cases in
@@ -237,7 +239,58 @@ _bt_getroot(Relation rel, int access)
 
        if (! P_ISROOT(rootopaque))
        {
-               /* it happened, try again */
+               /*
+                * It happened, but if root page splitter failed to create
+                * new root page then we'll go in loop trying to call
+                * _bt_getroot again and again.
+                */
+               if (FixBTree)
+               {
+                       Buffer  newrootbuf;
+
+check_parent:;
+                       if (rootopaque->btpo_parent == BTREE_METAPAGE)  /* unupdated! */
+                       {
+                               LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
+                               LockBuffer(rootbuf, BT_WRITE);
+
+                               /* handle concurrent fix of root page */
+                               if (rootopaque->btpo_parent == BTREE_METAPAGE)  /* unupdated! */
+                               {
+                                       newrootbuf = _bt_fixroot(rel, rootbuf, true);
+                                       LockBuffer(newrootbuf, BUFFER_LOCK_UNLOCK);
+                                       LockBuffer(newrootbuf, BT_READ);
+                                       rootbuf = newrootbuf;
+                                       rootpage = BufferGetPage(rootbuf);
+                                       rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
+                                       /* New root might be splitted while changing lock */
+                                       if (P_ISROOT(rootopaque))
+                                               return(rootbuf);
+                                       /* rootbuf is read locked */
+                                       goto check_parent;
+                               }
+                               else    /* someone else already fixed root */
+                               {
+                                       LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
+                                       LockBuffer(rootbuf, BT_READ);
+                               }
+                       }
+                       /*
+                        * Ok, here we have old root page with btpo_parent pointing
+                        * to upper level - check parent page because of there is
+                        * good chance that parent is root page.
+                        */
+                       newrootbuf = _bt_getbuf(rel, rootopaque->btpo_parent, BT_READ);
+                       _bt_relbuf(rel, rootbuf, BT_READ);
+                       rootbuf = newrootbuf;
+                       rootpage = BufferGetPage(rootbuf);
+                       rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
+                       if (P_ISROOT(rootopaque))
+                               return(rootbuf);
+                       /* no luck -:( */
+               }
+
+               /* try again */
                _bt_relbuf(rel, rootbuf, BT_READ);
                return _bt_getroot(rel, access);
        }
index 7b2f7fa7d981249af67120cbe510a8455fe855c7..8685975edf5752ac2376104d140013d5764f1a92 100644 (file)
@@ -12,7 +12,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.75 2001/01/24 19:42:48 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.76 2001/01/26 01:24:31 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "executor/executor.h"
 #include "miscadmin.h"
 #include "storage/sinval.h"
-
+#include "access/xlogutils.h"
 
 bool           BuildingBtree = false;          /* see comment in btbuild() */
-bool           FastBuild = true;       /* use sort/build instead of insertion
-                                                                * build */
+bool           FastBuild = true;       /* use sort/build instead */
+                                                               /* of insertion build */
 
-#include "access/xlogutils.h"
+
+/*
+ * TEMPORARY FLAG FOR TESTING NEW FIX TREE
+ * CODE WITHOUT AFFECTING ANYONE ELSE
+ */
+bool           FixBTree = false;
 
 static void _bt_restscan(IndexScanDesc scan);