*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.15 1997/06/06 03:11:42 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.16 1997/06/10 07:28:47 vadim Exp $
*
*-------------------------------------------------------------------------
*/
#endif
static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem);
-static Buffer _bt_split(Relation rel, Buffer buf, BTItem hiRightItem);
+static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright);
static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit);
static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);
static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);
static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem);
static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);
+#if 0
static InsertIndexResult _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem hikey);
+#endif
/*
* _bt_doinsert() -- Handle insertion of a single btitem in the tree.
BTStack stack;
Buffer buf;
BlockNumber blkno;
- int natts;
+ int natts = rel->rd_rel->relnatts;
InsertIndexResult res;
itup = &(btitem->bti_itup);
/* we need a scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup);
- natts = rel->rd_rel->relnatts;
/* find the page containing this key */
stack = _bt_search(rel, natts, itup_scankey, &buf);
{
InsertIndexResult res;
Page page;
- Buffer rbuf;
- Buffer pbuf;
- Page rpage;
- BTItem ritem;
BTPageOpaque lpageop;
- BTPageOpaque rpageop;
- BlockNumber rbknum, itup_blkno;
+ BlockNumber itup_blkno;
OffsetNumber itup_off;
+ OffsetNumber firstright = InvalidOffsetNumber;
int itemsz;
- Page ppage;
- BTPageOpaque ppageop;
+ bool do_split = false;
+ bool keys_equal = false;
page = BufferGetPage(buf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
* page in the chain of duplicates then:
* 1. if scankey == hikey (i.e. - new duplicate item) then
* insert it here;
- * 2. if scankey < hikey then we grab new page, copy current page
- * content there and insert new item on the current page.
+ * 2. if scankey < hikey then:
+ * 2.a if there is duplicate key(s) here - we force splitting;
+ * 2.b else - we may "eat" this page from duplicates chain.
*/
if ( lpageop->btpo_flags & BTP_CHAIN )
{
if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid,
BTLessStrategyNumber) )
elog (FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates");
- return (_bt_shift(rel, buf, stack, keysz, scankey, btitem, hitem));
+ if ( maxoff > P_HIKEY ) /* have duplicate(s) */
+ {
+ firstright = P_FIRSTKEY;
+ do_split = true;
+ }
+ else /* "eat" page */
+ {
+ Buffer pbuf;
+ Page ppage;
+
+ itup_blkno = BufferGetBlockNumber(buf);
+ itup_off = PageAddItem(page, (Item) btitem, itemsz,
+ P_FIRSTKEY, LP_USED);
+ if ( itup_off == InvalidOffsetNumber )
+ elog (FATAL, "btree: failed to add item");
+ lpageop->btpo_flags &= ~BTP_CHAIN;
+ pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+ ppage = BufferGetPage(pbuf);
+ PageIndexTupleDelete(ppage, stack->bts_offset);
+ pfree(stack->bts_btitem);
+ stack->bts_btitem = _bt_formitem(&(btitem->bti_itup));
+ ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
+ itup_blkno, P_HIKEY);
+ _bt_wrtbuf(rel, buf);
+ res = _bt_insertonpg(rel, pbuf, stack->bts_parent,
+ keysz, scankey, stack->bts_btitem,
+ NULL);
+ ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
+ return (res);
+ }
}
+ else
+ {
+ keys_equal = true;
+ if ( PageGetFreeSpace(page) < itemsz )
+ do_split = true;
+ }
+ }
+ else if ( PageGetFreeSpace(page) < itemsz )
+ do_split = true;
+ else if ( PageGetFreeSpace(page) < 3*itemsz + 2*sizeof(ItemIdData) )
+ {
+ OffsetNumber offnum = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY;
+ OffsetNumber maxoff = PageGetMaxOffsetNumber (page);
+ ItemId itid;
+ BTItem previtem, chkitem;
+ Size maxsize;
+ Size currsize;
+
+ itid = PageGetItemId(page, offnum);
+ previtem = (BTItem) PageGetItem(page, itid);
+ maxsize = currsize = (ItemIdGetLength(itid) + sizeof(ItemIdData));
+ for (offnum = OffsetNumberNext(offnum);
+ offnum <= maxoff; offnum = OffsetNumberNext(offnum) )
+ {
+ itid = PageGetItemId(page, offnum);
+ chkitem = (BTItem) PageGetItem(page, itid);
+ if ( !_bt_itemcmp (rel, keysz, previtem, chkitem,
+ BTEqualStrategyNumber) )
+ {
+ if ( currsize > maxsize )
+ maxsize = currsize;
+ currsize = 0;
+ previtem = chkitem;
+ }
+ currsize += (ItemIdGetLength(itid) + sizeof(ItemIdData));
+ }
+ if ( currsize > maxsize )
+ maxsize = currsize;
+ maxsize += sizeof (PageHeaderData) +
+ DOUBLEALIGN (sizeof (BTPageOpaqueData));
+ if ( maxsize >= PageGetPageSize (page) / 2 )
+ do_split = true;
}
-
- if (PageGetFreeSpace(page) < itemsz)
+ if ( do_split )
{
+ Buffer rbuf;
+ Page rpage;
+ BTItem ritem;
+ BlockNumber rbknum;
+ BTPageOpaque rpageop;
+ Buffer pbuf;
+ Page ppage;
+ BTPageOpaque ppageop;
BlockNumber bknum = BufferGetBlockNumber(buf);
BTItem lowLeftItem;
- BTItem hiRightItem = NULL;
+ OffsetNumber maxoff;
+ bool shifted = false;
+ bool left_chained = ( lpageop->btpo_flags & BTP_CHAIN ) ? true : false;
/*
- * If we have to split leaf page in the chain of duplicates
- * then we try to look at our right sibling first.
+ * If we have to split leaf page in the chain of duplicates by
+ * new duplicate then we try to look at our right sibling first.
*/
if ( ( lpageop->btpo_flags & BTP_CHAIN ) &&
- ( lpageop->btpo_flags & BTP_LEAF ) )
+ ( lpageop->btpo_flags & BTP_LEAF ) && keys_equal )
{
bool use_left = true;
- bool keys_equal = false;
rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
rpage = BufferGetPage(rbuf);
{
if ( !( rpageop->btpo_flags & BTP_CHAIN ) )
elog (FATAL, "btree: lost page in the chain of duplicates");
- keys_equal = true;
}
else if ( _bt_skeycmp (rel, keysz, scankey, rpage,
PageGetItemId(rpage, P_HIKEY),
BTGreaterStrategyNumber) )
elog (FATAL, "btree: hikey is out of order");
- /*
- * If hikey > scankey and BTP_CHAIN is ON
- * then it's first page of the chain of higher keys:
- * our left sibling hikey was lying! We can't add new
- * item here, but we can turn BTP_CHAIN off on our
- * left page and overwrite its hikey.
+ else if ( rpageop->btpo_flags & BTP_CHAIN )
+ /*
+ * If hikey > scankey then it's last page in chain and
+ * BTP_CHAIN must be OFF
*/
- if ( !keys_equal && ( rpageop->btpo_flags & BTP_CHAIN ) )
- {
- BTItem tmp;
-
- lpageop->btpo_flags &= ~BTP_CHAIN;
- tmp = (BTItem) PageGetItem(rpage, PageGetItemId(rpage, P_HIKEY));
- hiRightItem = _bt_formitem(&(tmp->bti_itup));
- }
+ elog (FATAL, "btree: lost last page in the chain of duplicates");
+
/* if there is room here then we use this page. */
- else if ( PageGetFreeSpace (rpage) > itemsz )
+ if ( PageGetFreeSpace (rpage) > itemsz )
use_left = false;
}
else /* rightmost page */
}
_bt_relbuf(rel, rbuf, BT_WRITE);
}
+ /*
+ * If after splitting un-chained page we'll got chain of pages
+ * with duplicates then we want to know
+ * 1. on which of two pages new btitem will go (current
+ * _bt_findsplitloc is quite bad);
+ * 2. what parent (if there's one) thinking about it
+ * (remember about deletions)
+ */
+ else if ( !( lpageop->btpo_flags & BTP_CHAIN ) )
+ {
+ OffsetNumber start = ( P_RIGHTMOST(lpageop) ) ? P_HIKEY : P_FIRSTKEY;
+ Size llimit;
+
+ maxoff = PageGetMaxOffsetNumber (page);
+ llimit = PageGetPageSize(page) - sizeof (PageHeaderData) -
+ DOUBLEALIGN (sizeof (BTPageOpaqueData))
+ + sizeof(ItemIdData);
+ llimit /= 2;
+ firstright = _bt_findsplitloc(rel, page, start, maxoff, llimit);
+
+ if ( _bt_itemcmp (rel, keysz,
+ (BTItem) PageGetItem(page, PageGetItemId(page, start)),
+ (BTItem) PageGetItem(page, PageGetItemId(page, firstright)),
+ BTEqualStrategyNumber) )
+ {
+ if ( _bt_skeycmp (rel, keysz, scankey, page,
+ PageGetItemId(page, firstright),
+ BTLessStrategyNumber) )
+ /*
+ * force moving current items to the new page:
+ * new item will go on the current page.
+ */
+ firstright = start;
+ else
+ /*
+ * new btitem >= firstright, start item == firstright -
+ * new chain of duplicates: if this non-leftmost leaf
+ * page and parent item < start item then force moving
+ * all items to the new page - current page will be
+ * "empty" after it.
+ */
+ {
+ if ( !P_LEFTMOST (lpageop) &&
+ ( lpageop->btpo_flags & BTP_LEAF ) )
+ {
+ ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
+ bknum, P_HIKEY);
+ pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+ if ( _bt_itemcmp (rel, keysz, stack->bts_btitem,
+ (BTItem) PageGetItem(page,
+ PageGetItemId(page, start)),
+ BTLessStrategyNumber) )
+ {
+ firstright = start;
+ shifted = true;
+ }
+ _bt_relbuf(rel, pbuf, BT_WRITE);
+ }
+ }
+ } /* else - no new chain if start item < firstright one */
+ }
/* split the buffer into left and right halves */
- rbuf = _bt_split(rel, buf, hiRightItem);
-
- if ( hiRightItem != (BTItem) NULL )
- pfree (hiRightItem);
+ rbuf = _bt_split(rel, buf, firstright);
/* which new page (left half or right half) gets the tuple? */
if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) {
itup_blkno = BufferGetBlockNumber(rbuf);
}
- lowLeftItem = (BTItem) PageGetItem(page,
+ maxoff = PageGetMaxOffsetNumber (page);
+ if ( shifted )
+ {
+ if ( maxoff > P_FIRSTKEY )
+ elog (FATAL, "btree: shifted page is not empty");
+ lowLeftItem = (BTItem) NULL;
+ }
+ else
+ {
+ if ( maxoff < P_FIRSTKEY )
+ elog (FATAL, "btree: un-shifted page is empty");
+ lowLeftItem = (BTItem) PageGetItem(page,
PageGetItemId(page, P_FIRSTKEY));
-
- if ( _bt_itemcmp (rel, keysz, lowLeftItem,
+ if ( _bt_itemcmp (rel, keysz, lowLeftItem,
(BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)),
BTEqualStrategyNumber) )
- lpageop->btpo_flags |= BTP_CHAIN;
+ lpageop->btpo_flags |= BTP_CHAIN;
+ }
/*
* By here,
BTItem new_item;
OffsetNumber upditem_offset = P_HIKEY;
bool do_update = false;
+ bool update_in_place = true;
+ bool parent_chained;
/* form a index tuple that points at the new right page */
rbknum = BufferGetBlockNumber(rbuf);
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
ppage = BufferGetPage(pbuf);
ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
+ parent_chained = (( ppageop->btpo_flags & BTP_CHAIN )) ? true : false;
+
+ if ( parent_chained && !left_chained )
+ elog (FATAL, "nbtree: unexpected chained parent of unchained page");
/*
* If the key of new_item is < than the key of the item
*/
if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, new_item,
BTGreaterStrategyNumber) ||
- ( _bt_itemcmp(rel, keysz, stack->bts_btitem,
+ ( !shifted &&
+ _bt_itemcmp(rel, keysz, stack->bts_btitem,
new_item, BTEqualStrategyNumber) &&
_bt_itemcmp(rel, keysz, lowLeftItem,
new_item, BTLessStrategyNumber) ) )
elog (FATAL, "btree: items are out of order (leftmost %d, stack %u, update %u)",
P_LEFTMOST(lpageop), stack->bts_offset, upditem_offset);
}
- /*
- * There was bug caused by deletion all minimum keys (K1) from
- * an index page and insertion there (up to page splitting)
- * higher duplicate keys (K2): after it parent item for left
- * page contained K1 and the next item (for new right page) - K2,
- * - and scan for the key = K2 lost items on the left page.
- * So, we have to update parent item if its key < minimum
- * key on the left and minimum keys on the left and on the right
- * are equal. It would be nice to update hikey on the previous
- * page of the left one too, but we may get deadlock here
- * (read comments in _bt_split), so we leave previous page
- * hikey _inconsistent_, but there should to be BTP_CHAIN flag
- * on it, which privents _bt_moveright from dangerous movings
- * from there. - vadim 05/27/97
- */
- else if ( _bt_itemcmp (rel, keysz, stack->bts_btitem,
- lowLeftItem, BTLessStrategyNumber) &&
- _bt_itemcmp (rel, keysz, new_item,
- lowLeftItem, BTEqualStrategyNumber) )
- {
- do_update = true;
- upditem_offset = stack->bts_offset;
- }
if ( do_update )
{
- /* Try to update in place. */
- if ( DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) ==
+ if ( shifted )
+ elog (FATAL, "btree: attempt to update parent for shifted page");
+ /*
+ * Try to update in place. If out parent page is chained
+ * then we must forse insertion.
+ */
+ if ( !parent_chained &&
+ DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) ==
DOUBLEALIGN (IndexTupleDSize (stack->bts_btitem->bti_itup)) )
{
_bt_updateitem(rel, keysz, pbuf,
}
else
{
+ update_in_place = false;
PageIndexTupleDelete(ppage, upditem_offset);
/*
ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
bknum, P_HIKEY);
- /* unlock the children before doing this */
+ /*
+ * Unlock the children before doing this
+ *
+ * Mmm ... I foresee problems here. - vadim 06/10/97
+ */
_bt_relbuf(rel, buf, BT_WRITE);
_bt_relbuf(rel, rbuf, BT_WRITE);
/*
* A regular _bt_binsrch should find the right place to
- * put the new entry, since it should be either lower
- * than any other key on the page or unique.
+ * put the new entry, since it should be lower than any
+ * other key on the page.
* Therefore set afteritem to NULL.
*/
newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup));
}
newskey = _bt_mkscankey(rel, &(new_item->bti_itup));
+
+ afteritem = stack->bts_btitem;
+ if ( parent_chained && !update_in_place )
+ {
+ ppage = BufferGetPage(pbuf);
+ ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
+ if ( ppageop->btpo_flags & BTP_CHAIN )
+ elog (FATAL, "btree: unexpected BTP_CHAIN flag in parent after update");
+ if ( P_RIGHTMOST (ppageop) )
+ elog (FATAL, "btree: chained parent is RIGHTMOST after update");
+ maxoff = PageGetMaxOffsetNumber (ppage);
+ if ( maxoff != P_FIRSTKEY )
+ elog (FATAL, "btree: FIRSTKEY was unexpected in parent after update");
+ if ( _bt_skeycmp (rel, keysz, newskey, ppage,
+ PageGetItemId(ppage, P_FIRSTKEY),
+ BTLessEqualStrategyNumber) )
+ elog (FATAL, "btree: parent FIRSTKEY is >= duplicate key after update");
+ if ( !_bt_skeycmp (rel, keysz, newskey, ppage,
+ PageGetItemId(ppage, P_HIKEY),
+ BTEqualStrategyNumber) )
+ elog (FATAL, "btree: parent HIGHKEY is not equal duplicate key after update");
+ afteritem = (BTItem) NULL;
+ }
+ else if ( left_chained && !update_in_place )
+ {
+ ppage = BufferGetPage(pbuf);
+ ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
+ if ( !P_RIGHTMOST (ppageop) &&
+ _bt_skeycmp (rel, keysz, newskey, ppage,
+ PageGetItemId(ppage, P_HIKEY),
+ BTGreaterStrategyNumber) )
+ afteritem = (BTItem) NULL;
+ }
+ if ( afteritem == (BTItem) NULL)
+ {
+ rbuf = _bt_getbuf(rel, ppageop->btpo_next, BT_WRITE);
+ _bt_relbuf(rel, pbuf, BT_WRITE);
+ pbuf = rbuf;
+ }
+
newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
keysz, newskey, new_item,
- stack->bts_btitem);
+ afteritem);
/* be tidy */
pfree(newres);
* pin and lock on buf are maintained.
*/
static Buffer
-_bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
+_bt_split(Relation rel, Buffer buf, OffsetNumber firstright)
{
Buffer rbuf;
Page origpage;
OffsetNumber leftoff, rightoff;
OffsetNumber start;
OffsetNumber maxoff;
- OffsetNumber firstright;
OffsetNumber i;
- Size llimit;
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
origpage = BufferGetPage(buf);
/* splitting a non-rightmost page, start at the first data item */
start = P_FIRSTKEY;
- /*
- * Copy the original high key to the new page if high key
- * was not passed by caller.
- */
- if ( hiRightItem == NULL )
- {
- itemid = PageGetItemId(origpage, P_HIKEY);
- itemsz = ItemIdGetLength(itemid);
- item = (BTItem) PageGetItem(origpage, itemid);
- }
- else
- {
- item = hiRightItem;
- itemsz = IndexTupleDSize(hiRightItem->bti_itup)
- + (sizeof(BTItemData) - sizeof(IndexTupleData));
- itemsz = DOUBLEALIGN(itemsz);
- }
+ itemid = PageGetItemId(origpage, P_HIKEY);
+ itemsz = ItemIdGetLength(itemid);
+ item = (BTItem) PageGetItem(origpage, itemid);
if ( PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber )
elog (FATAL, "btree: failed to add hikey to the right sibling");
rightoff = P_FIRSTKEY;
rightoff = P_HIKEY;
}
maxoff = PageGetMaxOffsetNumber(origpage);
- llimit = PageGetFreeSpace(leftpage) / 2;
- firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit);
+ if ( firstright == InvalidOffsetNumber )
+ {
+ Size llimit = PageGetFreeSpace(leftpage) / 2;
+ firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit);
+ }
for (i = start; i <= maxoff; i = OffsetNumberNext(i)) {
itemid = PageGetItemId(origpage, i);
Size nbytes;
int natts;
+ if ( start >= maxoff )
+ elog (FATAL, "btree: cannot split if start (%d) >= maxoff (%d)",
+ start, maxoff);
natts = rel->rd_rel->relnatts;
saferight = start;
safeitemid = PageGetItemId(page, saferight);
i = OffsetNumberNext(start);
- while (nbytes < llimit) {
-
+ while (nbytes < llimit)
+ {
/* check the next item on the page */
nxtitemid = PageGetItemId(page, i);
nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData));
safeitem = nxtitem;
saferight = i;
}
- i = OffsetNumberNext(i);
+ if ( i < maxoff )
+ i = OffsetNumberNext(i);
+ else
+ break;
}
/*
if (saferight == start)
saferight = i;
+ if ( saferight == maxoff && ( maxoff - start ) > 1 )
+ saferight = start + ( maxoff - start ) / 2;
+
return (saferight);
}
* If we have no adjacency information, and the item is equal to the
* high key on the page (by here it is), then the item does not belong
* on this page.
+ *
+ * Now it's not true in all cases. - vadim 06/10/97
*/
if (afteritem == (BTItem) NULL)
+ {
+ if ( opaque->btpo_flags & BTP_LEAF )
+ return (false);
+ if ( opaque->btpo_flags & BTP_CHAIN )
+ return (true);
+ if ( _bt_skeycmp (rel, keysz, scankey, page,
+ PageGetItemId(page, P_FIRSTKEY),
+ BTEqualStrategyNumber) )
+ return (true);
return (false);
+ }
/* damn, have to work for it. i hate that. */
maxoff = PageGetMaxOffsetNumber(page);
return (true);
}
+#if 0
/*
* _bt_shift - insert btitem on the passed page after shifting page
* to the right in the tree.
return (res);
}
+#endif