*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.79 2001/01/31 01:08:36 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.80 2001/02/02 19:49:15 vadim Exp $
*
*-------------------------------------------------------------------------
*/
Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
static void _bt_fixtree(Relation rel, BlockNumber blkno);
-static BlockNumber _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
+static void _bt_fixbranch(Relation rel, BlockNumber lblkno,
+ BlockNumber rblkno, BTStack true_stack);
+static void _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
+static void _bt_fixup(Relation rel, Buffer buf);
static OffsetNumber _bt_getoff(Page page, BlockNumber blkno);
static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
int leftfree, int rightfree,
bool newitemonleft, Size firstrightitemsz);
-static Buffer _bt_getstackbuf(Relation rel, BTStack stack);
+static Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
static void _bt_pgaddtup(Relation rel, Page page,
Size itemsize, BTItem btitem,
OffsetNumber itup_off, const char *where);
elog(ERROR, "bt_insertonpg: no root page found");
_bt_wrtbuf(rel, rbuf);
_bt_wrtnorelbuf(rel, buf);
- while(! P_LEFTMOST(lpageop))
- {
- BlockNumber blkno = lpageop->btpo_prev;
- LockBuffer(buf, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buf);
- buf = _bt_getbuf(rel, blkno, BT_WRITE);
- page = BufferGetPage(buf);
- lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
- /*
- * If someone else already created parent pages
- * then it's time for _bt_fixtree() to check upper
- * levels and fix them, if required.
- */
- if (lpageop->btpo_parent != BTREE_METAPAGE)
- {
- blkno = lpageop->btpo_parent;
- _bt_relbuf(rel, buf, BT_WRITE);
- _bt_fixtree(rel, blkno);
- goto formres;
- }
- }
- /*
- * Ok, we are on the leftmost page, it's write locked
- * by us and its btpo_parent points to meta page - time
- * for _bt_fixroot().
- */
- buf = _bt_fixroot(rel, buf, true);
- _bt_relbuf(rel, buf, BT_WRITE);
+ _bt_fixup(rel, buf);
goto formres;
}
ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
bknum, P_HIKEY);
- pbuf = _bt_getstackbuf(rel, stack);
-
- if (pbuf == InvalidBuffer)
- elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
- "\n\tRecreate index %s.", RelationGetRelationName(rel));
+ pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
/* Now we can write and unlock the children */
_bt_wrtbuf(rel, rbuf);
_bt_wrtbuf(rel, buf);
+ if (pbuf == InvalidBuffer)
+ {
+ if (!FixBTree)
+ elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
+ "\n\tRecreate index %s.", RelationGetRelationName(rel));
+ pfree(new_item);
+ _bt_fixbranch(rel, bknum, rbknum, stack);
+ goto formres;
+ }
+
/* Recursively update the parent */
newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
0, NULL, new_item, stack->bts_offset);
* Also, re-set bts_blkno & bts_offset if changed.
*/
static Buffer
-_bt_getstackbuf(Relation rel, BTStack stack)
+_bt_getstackbuf(Relation rel, BTStack stack, int access)
{
BlockNumber blkno;
Buffer buf;
BTPageOpaque opaque;
blkno = stack->bts_blkno;
- buf = _bt_getbuf(rel, blkno, BT_WRITE);
+ buf = _bt_getbuf(rel, blkno, access);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
/* by here, the item we're looking for moved right at least one page */
if (P_RIGHTMOST(opaque))
{
- _bt_relbuf(rel, buf, BT_WRITE);
+ _bt_relbuf(rel, buf, access);
return(InvalidBuffer);
}
blkno = opaque->btpo_next;
- _bt_relbuf(rel, buf, BT_WRITE);
- buf = _bt_getbuf(rel, blkno, BT_WRITE);
+ _bt_relbuf(rel, buf, access);
+ buf = _bt_getbuf(rel, blkno, access);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
/*
* Now read other pages (if any) on level and add them to new root.
+ * Here we break one of our locking rules - never hold lock on parent
+ * page when acquiring lock on its child, - but we free from deadlock:
+ *
* If concurrent process will split one of pages on this level then it
- * will notice either btpo_parent == metablock or btpo_parent == rootblk.
- * In first case it will give up its locks and try to lock leftmost page
- * buffer (oldrootbuf) to fix root - ie it will wait for us and let us
+ * will see either btpo_parent == metablock or btpo_parent == rootblk.
+ * In first case it will give up its locks and walk to the leftmost page
+ * (oldrootbuf) in _bt_fixup() - ie it will wait for us and let us
* continue. In second case it will try to lock rootbuf keeping its locks
* on buffers we already passed, also waiting for us. If we'll have to
* unlock rootbuf (split it) and that process will have to split page
* Check/fix level starting from page in buffer buf up to block
* limit on *child* level (or till rightmost child page if limit
* is InvalidBlockNumber). Start buffer must be read locked.
- * No pins/locks are held on exit. Returns block number of last
- * visited/pointing-to-limit page on *check/fix* level.
+ * No pins/locks are held on exit.
*/
-static BlockNumber
+static void
_bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
{
BlockNumber blkno = BufferGetBlockNumber(buf);
- BlockNumber pblkno = blkno;
Page page;
BTPageOpaque opaque;
BlockNumber cblkno[3];
BTStackData stack;
stack.bts_parent = NULL;
- stack.bts_blkno = pblkno;
+ stack.bts_blkno = blkno;
stack.bts_offset = InvalidOffsetNumber;
ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid),
cblkno[0], P_HIKEY);
- buf = _bt_getstackbuf(rel, &stack);
+ buf = _bt_getstackbuf(rel, &stack, BT_WRITE);
if (buf == InvalidBuffer)
elog(ERROR, "bt_fixlevel: pointer disappeared (need to recreate index)");
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
coff[0] = stack.bts_offset;
- pblkno = BufferGetBlockNumber(buf);
- parblk[0] = pblkno;
- if (cblkno[0] == limit)
- blkno = pblkno; /* where we have seen pointer to limit */
+ blkno = BufferGetBlockNumber(buf);
+ parblk[0] = blkno;
/* Check/insert missed pointers */
for (i = 1; i <= cidx; i++)
if (parblk[i] == parblk[i - 1] &&
coff[i] != coff[i - 1] + 1)
elog(ERROR, "bt_fixlevel: invalid item order(2) (need to recreate index)");
- if (cblkno[i] == limit)
- blkno = parblk[i];
continue;
}
/* Have to check next page ? */
buf = newbuf;
page = newpage;
opaque = newopaque;
- pblkno = BufferGetBlockNumber(buf);
- parblk[i] = pblkno;
- if (cblkno[i] == limit)
- blkno = pblkno;
+ blkno = BufferGetBlockNumber(buf);
+ parblk[i] = blkno;
continue;
}
/* unfound - need to insert on current page */
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
}
- pblkno = BufferGetBlockNumber(buf);
+ blkno = BufferGetBlockNumber(buf);
coff[i] = itup_off;
}
else
}
pfree(btitem);
- parblk[i] = pblkno;
- if (cblkno[i] == limit)
- blkno = pblkno;
+ parblk[i] = blkno;
}
/* copy page with pointer to cblkno[cidx] to temp storage */
_bt_relbuf(rel, buf, BT_WRITE);
page = (Page)tbuf;
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- if (limit == InvalidBlockNumber)
- blkno = pblkno; /* last visited page */
}
/* Continue if current check/fix level page is rightmost */
{
if (cidx == 2)
_bt_relbuf(rel, cbuf[2], BT_READ);
- return(blkno);
+ return;
}
if (cblkno[0] == limit || cblkno[1] == limit)
goodbye = true;
}
}
+/*
+ * Check/fix part of tree - branch - up from parent of level with blocks
+ * lblkno and rblknum. We first ensure that parent level has pointers
+ * to both lblkno & rblknum and if those pointers are on different
+ * parent pages then do the same for parent level, etc. No locks must
+ * be held on target level and upper on entry. No locks will be held
+ * on exit. Stack created when traversing tree down should be provided and
+ * it must points to parent level. rblkno must be on the right from lblkno.
+ * (This function is special edition of more expensive _bt_fixtree(),
+ * but it doesn't guarantee full consistency of tree.)
+ */
+static void
+_bt_fixbranch(Relation rel, BlockNumber lblkno,
+ BlockNumber rblkno, BTStack true_stack)
+{
+ BlockNumber blkno = true_stack->bts_blkno;
+ BTStackData stack;
+ BTPageOpaque opaque;
+ Buffer buf, rbuf;
+ Page page;
+ OffsetNumber offnum;
+
+ true_stack = true_stack->bts_parent;
+ for ( ; ; )
+ {
+ buf = _bt_getbuf(rel, blkno, BT_READ);
+
+ /* Check/fix parent level pointed by blkno */
+ _bt_fixlevel(rel, buf, rblkno);
+
+ /*
+ * Here parent level should have pointers for both
+ * lblkno and rblkno and we have to find them.
+ */
+ stack.bts_parent = NULL;
+ stack.bts_blkno = blkno;
+ stack.bts_offset = InvalidOffsetNumber;
+ ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), lblkno, P_HIKEY);
+ buf = _bt_getstackbuf(rel, &stack, BT_READ);
+ if (buf == InvalidBuffer)
+ elog(ERROR, "bt_fixbranch: left pointer unfound (need to recreate index)");
+ page = BufferGetPage(buf);
+ offnum = _bt_getoff(page, rblkno);
+
+ if (offnum != InvalidOffsetNumber) /* right pointer found */
+ {
+ if (offnum <= stack.bts_offset)
+ elog(ERROR, "bt_fixbranch: invalid item order (need to recreate index)");
+ _bt_relbuf(rel, buf, BT_READ);
+ return;
+ }
+
+ /* Pointers are on different parent pages - find right one */
+ lblkno = BufferGetBlockNumber(buf);
+
+ stack.bts_parent = NULL;
+ stack.bts_blkno = lblkno;
+ stack.bts_offset = InvalidOffsetNumber;
+ ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), rblkno, P_HIKEY);
+ rbuf = _bt_getstackbuf(rel, &stack, BT_READ);
+ if (rbuf == InvalidBuffer)
+ elog(ERROR, "bt_fixbranch: right pointer unfound (need to recreate index)");
+ rblkno = BufferGetBlockNumber(rbuf);
+ _bt_relbuf(rel, rbuf, BT_READ);
+
+ /*
+ * If we have parent item in true_stack then go up one level and
+ * ensure that it has pointers to new lblkno & rblkno.
+ */
+ if (true_stack)
+ {
+ _bt_relbuf(rel, buf, BT_READ);
+ blkno = true_stack->bts_blkno;
+ true_stack = true_stack->bts_parent;
+ continue;
+ }
+
+ /*
+ * Well, we are on the level that was root or unexistent when
+ * we started traversing tree down. If btpo_parent is updated
+ * then we'll use it to continue, else we'll fix/restore upper
+ * levels entirely.
+ */
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ if (opaque->btpo_parent != BTREE_METAPAGE)
+ {
+ blkno = opaque->btpo_parent;
+ _bt_relbuf(rel, buf, BT_READ);
+ continue;
+ }
+
+ /* Have to switch to excl buf lock and re-check btpo_parent */
+ _bt_relbuf(rel, buf, BT_READ);
+ buf = _bt_getbuf(rel, blkno, BT_WRITE);
+ page = BufferGetPage(buf);
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ if (opaque->btpo_parent != BTREE_METAPAGE)
+ {
+ blkno = opaque->btpo_parent;
+ _bt_relbuf(rel, buf, BT_WRITE);
+ continue;
+ }
+
+ /*
+ * We hold excl lock on some internal page with unupdated
+ * btpo_parent - time for _bt_fixup.
+ */
+ break;
+ }
+
+ _bt_fixup(rel, buf);
+
+ return;
+}
+
+/*
+ * Having buf excl locked this routine walks to the left on level and
+ * uses either _bt_fixtree() or _bt_fixroot() to create/check&fix upper
+ * levels. No buffer pins/locks will be held on exit.
+ */
+static void
+_bt_fixup(Relation rel, Buffer buf)
+{
+ Page page;
+ BTPageOpaque opaque;
+ BlockNumber blkno;
+
+ for ( ; ; )
+ {
+ page = BufferGetPage(buf);
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ /*
+ * If someone else already created parent pages
+ * then it's time for _bt_fixtree() to check upper
+ * levels and fix them, if required.
+ */
+ if (opaque->btpo_parent != BTREE_METAPAGE)
+ {
+ blkno = opaque->btpo_parent;
+ _bt_relbuf(rel, buf, BT_WRITE);
+ _bt_fixtree(rel, blkno);
+ return;
+ }
+ if (P_LEFTMOST(opaque))
+ break;
+ blkno = opaque->btpo_prev;
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buf);
+ buf = _bt_getbuf(rel, blkno, BT_WRITE);
+ }
+
+ /*
+ * Ok, we are on the leftmost page, it's write locked
+ * by us and its btpo_parent points to meta page - time
+ * for _bt_fixroot().
+ */
+ buf = _bt_fixroot(rel, buf, true);
+ _bt_relbuf(rel, buf, BT_WRITE);
+
+ return;
+}
+
static OffsetNumber
_bt_getoff(Page page, BlockNumber blkno)
{