1 /*-------------------------------------------------------------------------
4 * Item insertion in Lehman and Yao btrees for Postgres.
6 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.91 2002/05/24 18:57:55 tgl Exp $
13 *-------------------------------------------------------------------------
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "miscadmin.h"
25 /* context data for _bt_checksplitloc */
26 Size newitemsz; /* size of new item to be inserted */
27 bool is_leaf; /* T if splitting a leaf page */
28 bool is_rightmost; /* T if splitting a rightmost page */
30 bool have_split; /* found a valid split? */
32 /* these fields valid only if have_split is true */
33 bool newitemonleft; /* new item on left or right of best split */
34 OffsetNumber firstright; /* best split point */
35 int best_delta; /* best size delta so far */
40 Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
41 static void _bt_fixtree(Relation rel, BlockNumber blkno);
42 static void _bt_fixbranch(Relation rel, BlockNumber lblkno,
43 BlockNumber rblkno, BTStack true_stack);
44 static void _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
45 static void _bt_fixup(Relation rel, Buffer buf);
46 static OffsetNumber _bt_getoff(Page page, BlockNumber blkno);
48 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
50 static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
51 Relation heapRel, Buffer buf,
52 ScanKey itup_scankey);
53 static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
55 int keysz, ScanKey scankey,
57 OffsetNumber afteritem);
58 static void _bt_insertuple(Relation rel, Buffer buf,
59 Size itemsz, BTItem btitem, OffsetNumber newitemoff);
60 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
61 OffsetNumber newitemoff, Size newitemsz,
62 BTItem newitem, bool newitemonleft,
63 OffsetNumber *itup_off, BlockNumber *itup_blkno);
64 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
65 OffsetNumber newitemoff,
68 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
69 int leftfree, int rightfree,
70 bool newitemonleft, Size firstrightitemsz);
71 static Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
72 static void _bt_pgaddtup(Relation rel, Page page,
73 Size itemsize, BTItem btitem,
74 OffsetNumber itup_off, const char *where);
75 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
76 int keysz, ScanKey scankey);
80 * _bt_doinsert() -- Handle insertion of a single btitem in the tree.
82 * This routine is called by the public interface routines, btbuild
83 * and btinsert. By here, btitem is filled in, including the TID.
86 _bt_doinsert(Relation rel, BTItem btitem,
87 bool index_is_unique, Relation heapRel)
89 IndexTuple itup = &(btitem->bti_itup);
90 int natts = rel->rd_rel->relnatts;
94 InsertIndexResult res;
96 /* we need a scan key to do our search, so build one */
97 itup_scankey = _bt_mkscankey(rel, itup);
100 /* find the page containing this key */
101 stack = _bt_search(rel, natts, itup_scankey, &buf, BT_WRITE);
103 /* trade in our read lock for a write lock */
104 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
105 LockBuffer(buf, BT_WRITE);
108 * If the page was split between the time that we surrendered our read
109 * lock and acquired our write lock, then this page may no longer be
110 * the right place for the key we want to insert. In this case, we
111 * need to move right in the tree. See Lehman and Yao for an
112 * excruciatingly precise description.
114 buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE);
117 * If we're not allowing duplicates, make sure the key isn't already
120 * NOTE: obviously, _bt_check_unique can only detect keys that are
121 * already in the index; so it cannot defend against concurrent
122 * insertions of the same key. We protect against that by means
123 * of holding a write lock on the target page. Any other would-be
124 * inserter of the same key must acquire a write lock on the same
125 * target page, so only one would-be inserter can be making the check
126 * at one time. Furthermore, once we are past the check we hold
127 * write locks continuously until we have performed our insertion,
128 * so no later inserter can fail to see our insertion. (This
129 * requires some care in _bt_insertonpg.)
131 * If we must wait for another xact, we release the lock while waiting,
132 * and then must start over completely.
138 xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
140 if (TransactionIdIsValid(xwait))
142 /* Have to wait for the other guy ... */
143 _bt_relbuf(rel, buf);
144 XactLockTableWait(xwait);
146 _bt_freestack(stack);
151 /* do the insertion */
152 res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0);
155 _bt_freestack(stack);
156 _bt_freeskey(itup_scankey);
162 * _bt_check_unique() -- Check for violation of unique index constraint
164 * Returns InvalidTransactionId if there is no conflict, else an xact ID
165 * we must wait for to see if it commits a conflicting tuple. If an actual
166 * conflict is detected, no return --- just elog().
169 _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
170 Buffer buf, ScanKey itup_scankey)
172 TupleDesc itupdesc = RelationGetDescr(rel);
173 int natts = rel->rd_rel->relnatts;
178 Buffer nbuf = InvalidBuffer;
180 page = BufferGetPage(buf);
181 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
182 maxoff = PageGetMaxOffsetNumber(page);
185 * Find first item >= proposed new item. Note we could also get a
186 * pointer to end-of-page here.
188 offset = _bt_binsrch(rel, buf, natts, itup_scankey);
191 * Scan over all equal tuples, looking for live conflicts.
202 * make sure the offset points to an actual key before trying to
205 if (offset <= maxoff)
208 * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's how we
209 * handling NULLs - and so we must not use _bt_compare in real
210 * comparison, but only for ordering/finding items on pages. -
213 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
214 break; /* we're past all the equal tuples */
216 curitemid = PageGetItemId(page, offset);
218 * We can skip the heap fetch if the item is marked killed.
220 if (!ItemIdDeleted(curitemid))
222 cbti = (BTItem) PageGetItem(page, curitemid);
223 htup.t_self = cbti->bti_itup.t_tid;
224 if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
227 /* it is a duplicate */
228 TransactionId xwait =
229 (TransactionIdIsValid(SnapshotDirty->xmin)) ?
230 SnapshotDirty->xmin : SnapshotDirty->xmax;
232 ReleaseBuffer(hbuffer);
234 * If this tuple is being updated by other transaction
235 * then we have to wait for its commit/abort.
237 if (TransactionIdIsValid(xwait))
239 if (nbuf != InvalidBuffer)
240 _bt_relbuf(rel, nbuf);
241 /* Tell _bt_doinsert to wait... */
246 * Otherwise we have a definite conflict.
248 elog(ERROR, "Cannot insert a duplicate key into unique index %s",
249 RelationGetRelationName(rel));
254 * Hmm, if we can't see the tuple, maybe it can be
255 * marked killed. This logic should match index_getnext
260 LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
261 sv_infomask = htup.t_data->t_infomask;
262 if (HeapTupleSatisfiesVacuum(htup.t_data,
266 curitemid->lp_flags |= LP_DELETE;
267 SetBufferCommitInfoNeedsSave(buf);
269 if (sv_infomask != htup.t_data->t_infomask)
270 SetBufferCommitInfoNeedsSave(hbuffer);
271 LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
272 ReleaseBuffer(hbuffer);
278 * Advance to next tuple to continue checking.
281 offset = OffsetNumberNext(offset);
284 /* If scankey == hikey we gotta check the next page too */
285 if (P_RIGHTMOST(opaque))
287 if (!_bt_isequal(itupdesc, page, P_HIKEY,
288 natts, itup_scankey))
290 nblkno = opaque->btpo_next;
291 if (nbuf != InvalidBuffer)
292 _bt_relbuf(rel, nbuf);
293 nbuf = _bt_getbuf(rel, nblkno, BT_READ);
294 page = BufferGetPage(nbuf);
295 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
296 maxoff = PageGetMaxOffsetNumber(page);
297 offset = P_FIRSTDATAKEY(opaque);
301 if (nbuf != InvalidBuffer)
302 _bt_relbuf(rel, nbuf);
304 return InvalidTransactionId;
308 * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
310 * This recursive procedure does the following things:
312 * + finds the right place to insert the tuple.
313 * + if necessary, splits the target page (making sure that the
314 * split is equitable as far as post-insert free space goes).
315 * + inserts the tuple.
316 * + if the page was split, pops the parent stack, and finds the
317 * right place to insert the new child pointer (by walking
318 * right using information stored in the parent stack).
319 * + invokes itself with the appropriate tuple for the right
320 * child page on the parent.
322 * On entry, we must have the right buffer on which to do the
323 * insertion, and the buffer must be pinned and locked. On return,
324 * we will have dropped both the pin and the write lock on the buffer.
326 * If 'afteritem' is >0 then the new tuple must be inserted after the
327 * existing item of that number, noplace else. If 'afteritem' is 0
328 * then the procedure finds the exact spot to insert it by searching.
329 * (keysz and scankey parameters are used ONLY if afteritem == 0.)
331 * NOTE: if the new key is equal to one or more existing keys, we can
332 * legitimately place it anywhere in the series of equal keys --- in fact,
333 * if the new key is equal to the page's "high key" we can place it on
334 * the next page. If it is equal to the high key, and there's not room
335 * to insert the new tuple on the current page without splitting, then
336 * we can move right hoping to find more free space and avoid a split.
337 * (We should not move right indefinitely, however, since that leads to
338 * O(N^2) insertion behavior in the presence of many equal keys.)
339 * Once we have chosen the page to put the key on, we'll insert it before
340 * any existing equal keys because of the way _bt_binsrch() works.
342 * The locking interactions in this code are critical. You should
343 * grok Lehman and Yao's paper before making any changes. In addition,
344 * you need to understand how we disambiguate duplicate keys in this
345 * implementation, in order to be able to find our location using
346 * L&Y "move right" operations. Since we may insert duplicate user
347 * keys, and since these dups may propagate up the tree, we use the
348 * 'afteritem' parameter to position ourselves correctly for the
349 * insertion on internal pages.
352 static InsertIndexResult
353 _bt_insertonpg(Relation rel,
359 OffsetNumber afteritem)
361 InsertIndexResult res;
363 BTPageOpaque lpageop;
364 OffsetNumber itup_off;
365 BlockNumber itup_blkno;
366 OffsetNumber newitemoff;
367 OffsetNumber firstright = InvalidOffsetNumber;
370 page = BufferGetPage(buf);
371 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
373 itemsz = IndexTupleDSize(btitem->bti_itup)
374 + (sizeof(BTItemData) - sizeof(IndexTupleData));
376 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but
377 * we need to be consistent */
380 * Check whether the item can fit on a btree page at all. (Eventually,
381 * we ought to try to apply TOAST methods if not.) We actually need to
382 * be able to fit three items on every page, so restrict any one item
383 * to 1/3 the per-page available space. Note that at this point,
384 * itemsz doesn't include the ItemId.
386 if (itemsz > (PageGetPageSize(page) - sizeof(PageHeaderData) - MAXALIGN(sizeof(BTPageOpaqueData))) / 3 - sizeof(ItemIdData))
387 elog(ERROR, "btree: index item size %lu exceeds maximum %lu",
388 (unsigned long) itemsz,
389 (PageGetPageSize(page) - sizeof(PageHeaderData) - MAXALIGN(sizeof(BTPageOpaqueData))) / 3 - sizeof(ItemIdData));
392 * Determine exactly where new item will go.
395 newitemoff = afteritem + 1;
399 * If we will need to split the page to put the item here,
400 * check whether we can put the tuple somewhere to the right,
401 * instead. Keep scanning right until we
402 * (a) find a page with enough free space,
403 * (b) reach the last page where the tuple can legally go, or
404 * (c) get tired of searching.
405 * (c) is not flippant; it is important because if there are many
406 * pages' worth of equal keys, it's better to split one of the early
407 * pages than to scan all the way to the end of the run of equal keys
408 * on every insert. We implement "get tired" as a random choice,
409 * since stopping after scanning a fixed number of pages wouldn't work
410 * well (we'd never reach the right-hand side of previously split
411 * pages). Currently the probability of moving right is set at 0.99,
412 * which may seem too high to change the behavior much, but it does an
413 * excellent job of preventing O(N^2) behavior with many equal keys.
416 bool movedright = false;
418 while (PageGetFreeSpace(page) < itemsz &&
419 !P_RIGHTMOST(lpageop) &&
420 _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
421 random() > (MAX_RANDOM_VALUE / 100))
423 /* step right one page */
424 BlockNumber rblkno = lpageop->btpo_next;
428 * must write-lock next page before releasing write lock on
429 * current page; else someone else's _bt_check_unique scan
430 * could fail to see our insertion.
432 rbuf = _bt_getbuf(rel, rblkno, BT_WRITE);
433 _bt_relbuf(rel, buf);
435 page = BufferGetPage(buf);
436 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
441 * Now we are on the right page, so find the insert position. If
442 * we moved right at all, we know we should insert at the start of
443 * the page, else must find the position by searching.
446 newitemoff = P_FIRSTDATAKEY(lpageop);
448 newitemoff = _bt_binsrch(rel, buf, keysz, scankey);
452 * Do we need to split the page to fit the item on it?
454 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
455 * so this comparison is correct even though we appear to be
456 * accounting only for the item and not for its line pointer.
458 if (PageGetFreeSpace(page) < itemsz)
461 BlockNumber bknum = BufferGetBlockNumber(buf);
463 bool is_root = P_ISROOT(lpageop);
466 /* Choose the split point */
467 firstright = _bt_findsplitloc(rel, page,
471 /* split the buffer into left and right halves */
472 rbuf = _bt_split(rel, buf, firstright,
473 newitemoff, itemsz, btitem, newitemonleft,
474 &itup_off, &itup_blkno);
479 * + our target page has been split;
480 * + the original tuple has been inserted;
481 * + we have write locks on both the old (left half)
482 * and new (right half) buffers, after the split; and
483 * + we know the key we want to insert into the parent
484 * (it's the "high key" on the left child page).
486 * We're ready to do the parent insertion. We need to hold onto the
487 * locks for the child pages until we locate the parent, but we can
488 * release them before doing the actual insertion (see Lehman and Yao
489 * for the reasoning).
491 * Here we have to do something Lehman and Yao don't talk about:
492 * deal with a root split and construction of a new root. If our
493 * stack is empty then we have just split a node on what had been
494 * the root level when we descended the tree. If it is still the
495 * root then we perform a new-root construction. If it *wasn't*
496 * the root anymore, use the parent pointer to get up to the root
497 * level that someone constructed meanwhile, and find the right
498 * place to insert as for the normal case.
506 Assert(stack == (BTStack) NULL);
507 /* create a new root node and release the split buffers */
508 rootbuf = _bt_newroot(rel, buf, rbuf);
509 _bt_wrtbuf(rel, rootbuf);
510 _bt_wrtbuf(rel, rbuf);
511 _bt_wrtbuf(rel, buf);
515 InsertIndexResult newres;
517 BTStackData fakestack;
521 /* If root page was splitted */
522 if (stack == (BTStack) NULL)
524 elog(LOG, "btree: concurrent ROOT page split");
527 * If root page splitter failed to create new root page
528 * then old root' btpo_parent still points to metapage. We
529 * have to fix root page in this case.
531 if (BTreeInvalidParent(lpageop))
534 elog(ERROR, "bt_insertonpg[%s]: no root page found", RelationGetRelationName(rel));
535 _bt_wrtbuf(rel, rbuf);
536 _bt_wrtnorelbuf(rel, buf);
537 elog(WARNING, "bt_insertonpg[%s]: root page unfound - fixing upper levels", RelationGetRelationName(rel));
543 * Set up a phony stack entry if we haven't got a real one
546 stack->bts_blkno = lpageop->btpo_parent;
547 stack->bts_offset = InvalidOffsetNumber;
548 /* bts_btitem will be initialized below */
549 stack->bts_parent = NULL;
552 /* get high key from left page == lowest key on new right page */
553 ritem = (BTItem) PageGetItem(page,
554 PageGetItemId(page, P_HIKEY));
556 /* form an index tuple that points at the new right page */
557 new_item = _bt_formitem(&(ritem->bti_itup));
558 rbknum = BufferGetBlockNumber(rbuf);
559 ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
562 * Find the parent buffer and get the parent page.
564 * Oops - if we were moved right then we need to change stack
565 * item! We want to find parent pointing to where we are,
566 * right ? - vadim 05/27/97
568 * Interestingly, this means we didn't *really* need to stack the
569 * parent key at all; all we really care about is the saved
570 * block and offset as a starting point for our search...
572 ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
575 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
577 /* Now we can write and unlock the children */
578 _bt_wrtbuf(rel, rbuf);
579 _bt_wrtbuf(rel, buf);
581 if (pbuf == InvalidBuffer)
584 elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
585 "\n\tRecreate index %s.", RelationGetRelationName(rel));
587 elog(WARNING, "bt_insertonpg[%s]: parent page unfound - fixing branch", RelationGetRelationName(rel));
588 _bt_fixbranch(rel, bknum, rbknum, stack);
591 /* Recursively update the parent */
592 newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
593 0, NULL, new_item, stack->bts_offset);
602 itup_off = newitemoff;
603 itup_blkno = BufferGetBlockNumber(buf);
605 _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
607 /* Write out the updated page and release pin/lock */
608 _bt_wrtbuf(rel, buf);
612 /* by here, the new tuple is inserted at itup_blkno/itup_off */
613 res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
614 ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
620 _bt_insertuple(Relation rel, Buffer buf,
621 Size itemsz, BTItem btitem, OffsetNumber newitemoff)
623 Page page = BufferGetPage(buf);
624 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
626 START_CRIT_SECTION();
627 _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
630 xl_btree_insert xlrec;
631 uint8 flag = XLOG_BTREE_INSERT;
633 XLogRecData rdata[2];
634 BTItemData truncitem;
636 xlrec.target.node = rel->rd_node;
637 ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff);
638 rdata[0].buffer = InvalidBuffer;
639 rdata[0].data = (char *) &xlrec;
640 rdata[0].len = SizeOfBtreeInsert;
641 rdata[0].next = &(rdata[1]);
643 /* Read comments in _bt_pgaddtup */
644 if (!(P_ISLEAF(pageop)) && newitemoff == P_FIRSTDATAKEY(pageop))
647 truncitem.bti_itup.t_info = sizeof(BTItemData);
648 rdata[1].data = (char *) &truncitem;
649 rdata[1].len = sizeof(BTItemData);
653 rdata[1].data = (char *) btitem;
654 rdata[1].len = IndexTupleDSize(btitem->bti_itup) +
655 (sizeof(BTItemData) - sizeof(IndexTupleData));
657 rdata[1].buffer = buf;
658 rdata[1].next = NULL;
659 if (P_ISLEAF(pageop))
660 flag |= XLOG_BTREE_LEAF;
662 recptr = XLogInsert(RM_BTREE_ID, flag, rdata);
664 PageSetLSN(page, recptr);
665 PageSetSUI(page, ThisStartUpID);
672 * _bt_split() -- split a page in the btree.
674 * On entry, buf is the page to split, and is write-locked and pinned.
675 * firstright is the item index of the first item to be moved to the
676 * new right page. newitemoff etc. tell us about the new item that
677 * must be inserted along with the data from the old page.
679 * Returns the new right sibling of buf, pinned and write-locked.
680 * The pin and lock on buf are maintained. *itup_off and *itup_blkno
681 * are set to the exact location where newitem was inserted.
684 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
685 OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
687 OffsetNumber *itup_off, BlockNumber *itup_blkno)
693 BTPageOpaque ropaque,
701 OffsetNumber leftoff,
707 rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
708 origpage = BufferGetPage(buf);
709 leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
710 rightpage = BufferGetPage(rbuf);
712 _bt_pageinit(leftpage, BufferGetPageSize(buf));
713 _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
715 /* init btree private data */
716 oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
717 lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
718 ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
720 /* if we're splitting this page, it won't be the root when we're done */
721 lopaque->btpo_flags = oopaque->btpo_flags;
722 lopaque->btpo_flags &= ~BTP_ROOT;
723 ropaque->btpo_flags = lopaque->btpo_flags;
724 lopaque->btpo_prev = oopaque->btpo_prev;
725 lopaque->btpo_next = BufferGetBlockNumber(rbuf);
726 ropaque->btpo_prev = BufferGetBlockNumber(buf);
727 ropaque->btpo_next = oopaque->btpo_next;
730 * Must copy the original parent link into both new pages, even though
731 * it might be quite obsolete by now. We might need it if this level
732 * is or recently was the root (see README).
734 lopaque->btpo_parent = ropaque->btpo_parent = oopaque->btpo_parent;
737 * If the page we're splitting is not the rightmost page at its level
738 * in the tree, then the first entry on the page is the high key for
739 * the page. We need to copy that to the right half. Otherwise
740 * (meaning the rightmost page case), all the items on the right half
745 if (!P_RIGHTMOST(oopaque))
747 itemid = PageGetItemId(origpage, P_HIKEY);
748 itemsz = ItemIdGetLength(itemid);
749 item = (BTItem) PageGetItem(origpage, itemid);
750 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
751 LP_USED) == InvalidOffsetNumber)
752 elog(PANIC, "btree: failed to add hikey to the right sibling");
753 rightoff = OffsetNumberNext(rightoff);
757 * The "high key" for the new left page will be the first key that's
758 * going to go into the new right page. This might be either the
759 * existing data item at position firstright, or the incoming tuple.
762 if (!newitemonleft && newitemoff == firstright)
764 /* incoming tuple will become first on right page */
770 /* existing item at firstright will become first on right page */
771 itemid = PageGetItemId(origpage, firstright);
772 itemsz = ItemIdGetLength(itemid);
773 item = (BTItem) PageGetItem(origpage, itemid);
776 if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
777 LP_USED) == InvalidOffsetNumber)
778 elog(PANIC, "btree: failed to add hikey to the left sibling");
779 leftoff = OffsetNumberNext(leftoff);
782 * Now transfer all the data items to the appropriate page
784 maxoff = PageGetMaxOffsetNumber(origpage);
786 for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
788 itemid = PageGetItemId(origpage, i);
789 itemsz = ItemIdGetLength(itemid);
790 item = (BTItem) PageGetItem(origpage, itemid);
792 /* does new item belong before this one? */
797 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
800 *itup_blkno = BufferGetBlockNumber(buf);
801 leftoff = OffsetNumberNext(leftoff);
805 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
807 *itup_off = rightoff;
808 *itup_blkno = BufferGetBlockNumber(rbuf);
809 rightoff = OffsetNumberNext(rightoff);
813 /* decide which page to put it on */
816 _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
818 leftoff = OffsetNumberNext(leftoff);
822 _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
824 rightoff = OffsetNumberNext(rightoff);
828 /* cope with possibility that newitem goes at the end */
833 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
836 *itup_blkno = BufferGetBlockNumber(buf);
837 leftoff = OffsetNumberNext(leftoff);
841 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
843 *itup_off = rightoff;
844 *itup_blkno = BufferGetBlockNumber(rbuf);
845 rightoff = OffsetNumberNext(rightoff);
850 * We have to grab the right sibling (if any) and fix the prev pointer
851 * there. We are guaranteed that this is deadlock-free since no other
852 * writer will be holding a lock on that page and trying to move left,
853 * and all readers release locks on a page before trying to fetch its
857 if (!P_RIGHTMOST(ropaque))
859 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
860 spage = BufferGetPage(sbuf);
864 * Right sibling is locked, new siblings are prepared, but original
865 * page is not updated yet. Log changes before continuing.
867 * NO ELOG(ERROR) till right sibling is updated.
869 START_CRIT_SECTION();
871 xl_btree_split xlrec;
872 int flag = (newitemonleft) ?
873 XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT;
876 XLogRecData rdata[4];
878 xlrec.target.node = rel->rd_node;
879 ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off);
882 blkno = BufferGetBlockNumber(rbuf);
883 BlockIdSet(&(xlrec.otherblk), blkno);
887 blkno = BufferGetBlockNumber(buf);
888 BlockIdSet(&(xlrec.otherblk), blkno);
890 BlockIdSet(&(xlrec.parentblk), lopaque->btpo_parent);
891 BlockIdSet(&(xlrec.leftblk), lopaque->btpo_prev);
892 BlockIdSet(&(xlrec.rightblk), ropaque->btpo_next);
895 * Dirrect access to page is not good but faster - we should
896 * implement some new func in page API.
898 xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
899 ((PageHeader) leftpage)->pd_upper;
900 rdata[0].buffer = InvalidBuffer;
901 rdata[0].data = (char *) &xlrec;
902 rdata[0].len = SizeOfBtreeSplit;
903 rdata[0].next = &(rdata[1]);
905 rdata[1].buffer = InvalidBuffer;
906 rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
907 rdata[1].len = xlrec.leftlen;
908 rdata[1].next = &(rdata[2]);
910 rdata[2].buffer = InvalidBuffer;
911 rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
912 rdata[2].len = ((PageHeader) rightpage)->pd_special -
913 ((PageHeader) rightpage)->pd_upper;
914 rdata[2].next = NULL;
916 if (!P_RIGHTMOST(ropaque))
918 BTPageOpaque sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
920 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
922 rdata[2].next = &(rdata[3]);
923 rdata[3].buffer = sbuf;
924 rdata[3].data = NULL;
926 rdata[3].next = NULL;
929 if (P_ISLEAF(lopaque))
930 flag |= XLOG_BTREE_LEAF;
932 recptr = XLogInsert(RM_BTREE_ID, flag, rdata);
934 PageSetLSN(leftpage, recptr);
935 PageSetSUI(leftpage, ThisStartUpID);
936 PageSetLSN(rightpage, recptr);
937 PageSetSUI(rightpage, ThisStartUpID);
938 if (!P_RIGHTMOST(ropaque))
940 PageSetLSN(spage, recptr);
941 PageSetSUI(spage, ThisStartUpID);
946 * By here, the original data page has been split into two new halves,
947 * and these are correct. The algorithm requires that the left page
948 * never move during a split, so we copy the new left page back on top
949 * of the original. Note that this is not a waste of time, since we
950 * also require (in the page management code) that the center of a
951 * page always be clean, and the most efficient way to guarantee this
952 * is just to compact the data by reinserting it into a new left page.
955 PageRestoreTempPage(leftpage, origpage);
959 /* write and release the old right sibling */
960 if (!P_RIGHTMOST(ropaque))
961 _bt_wrtbuf(rel, sbuf);
968 * _bt_findsplitloc() -- find an appropriate place to split a page.
970 * The idea here is to equalize the free space that will be on each split
971 * page, *after accounting for the inserted tuple*. (If we fail to account
972 * for it, we might find ourselves with too little room on the page that
973 * it needs to go into!)
975 * If the page is the rightmost page on its level, we instead try to arrange
976 * for twice as much free space on the right as on the left. In this way,
977 * when we are inserting successively increasing keys (consider sequences,
978 * timestamps, etc) we will end up with a tree whose pages are about 67% full,
979 * instead of the 50% full result that we'd get without this special case.
980 * (We could bias it even further to make the initially-loaded tree more full.
981 * But since the steady-state load for a btree is about 70%, we'd likely just
982 * be making more page-splitting work for ourselves later on, when we start
983 * seeing updates to existing tuples.)
985 * We are passed the intended insert position of the new tuple, expressed as
986 * the offsetnumber of the tuple it must go in front of. (This could be
987 * maxoff+1 if the tuple is to go at the end.)
989 * We return the index of the first existing tuple that should go on the
990 * righthand page, plus a boolean indicating whether the new tuple goes on
991 * the left or right page. The bool is necessary to disambiguate the case
992 * where firstright == newitemoff.
995 _bt_findsplitloc(Relation rel,
997 OffsetNumber newitemoff,
1001 BTPageOpaque opaque;
1002 OffsetNumber offnum;
1003 OffsetNumber maxoff;
1005 FindSplitData state;
1012 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1014 /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1015 newitemsz += sizeof(ItemIdData);
1016 state.newitemsz = newitemsz;
1017 state.is_leaf = P_ISLEAF(opaque);
1018 state.is_rightmost = P_RIGHTMOST(opaque);
1019 state.have_split = false;
1021 /* Total free space available on a btree page, after fixed overhead */
1022 leftspace = rightspace =
1023 PageGetPageSize(page) - sizeof(PageHeaderData) -
1024 MAXALIGN(sizeof(BTPageOpaqueData))
1025 +sizeof(ItemIdData);
1028 * Finding the best possible split would require checking all the
1029 * possible split points, because of the high-key and left-key special
1030 * cases. That's probably more work than it's worth; instead, stop as
1031 * soon as we find a "good-enough" split, where good-enough is defined
1032 * as an imbalance in free space of no more than pagesize/16
1033 * (arbitrary...) This should let us stop near the middle on most
1034 * pages, instead of plowing to the end.
1036 goodenough = leftspace / 16;
1038 /* The right page will have the same high key as the old page */
1039 if (!P_RIGHTMOST(opaque))
1041 itemid = PageGetItemId(page, P_HIKEY);
1042 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1043 sizeof(ItemIdData));
1046 /* Count up total space in data items without actually scanning 'em */
1047 dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
1050 * Scan through the data items and calculate space usage for a split
1051 * at each possible position.
1053 dataitemstoleft = 0;
1054 maxoff = PageGetMaxOffsetNumber(page);
1056 for (offnum = P_FIRSTDATAKEY(opaque);
1058 offnum = OffsetNumberNext(offnum))
1064 itemid = PageGetItemId(page, offnum);
1065 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1068 * We have to allow for the current item becoming the high key of
1069 * the left page; therefore it counts against left space as well
1072 leftfree = leftspace - dataitemstoleft - (int) itemsz;
1073 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
1076 * Will the new item go to left or right of split?
1078 if (offnum > newitemoff)
1079 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1081 else if (offnum < newitemoff)
1082 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1086 /* need to try it both ways! */
1087 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1089 /* here we are contemplating newitem as first on right */
1090 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1094 /* Abort scan once we find a good-enough choice */
1095 if (state.have_split && state.best_delta <= goodenough)
1098 dataitemstoleft += itemsz;
1102 * I believe it is not possible to fail to find a feasible split, but
1105 if (!state.have_split)
1106 elog(FATAL, "_bt_findsplitloc: can't find a feasible split point for %s",
1107 RelationGetRelationName(rel));
1109 *newitemonleft = state.newitemonleft;
1110 return state.firstright;
1114 * Subroutine to analyze a particular possible split choice (ie, firstright
1115 * and newitemonleft settings), and record the best split so far in *state.
1118 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1119 int leftfree, int rightfree,
1120 bool newitemonleft, Size firstrightitemsz)
1123 * Account for the new item on whichever side it is to be put.
1126 leftfree -= (int) state->newitemsz;
1128 rightfree -= (int) state->newitemsz;
1131 * If we are not on the leaf level, we will be able to discard the key
1132 * data from the first item that winds up on the right page.
1134 if (!state->is_leaf)
1135 rightfree += (int) firstrightitemsz -
1136 (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
1139 * If feasible split point, remember best delta.
1141 if (leftfree >= 0 && rightfree >= 0)
1145 if (state->is_rightmost)
1148 * On a rightmost page, try to equalize right free space with
1149 * twice the left free space. See comments for
1152 delta = (2 * leftfree) - rightfree;
1156 /* Otherwise, aim for equal free space on both sides */
1157 delta = leftfree - rightfree;
1162 if (!state->have_split || delta < state->best_delta)
1164 state->have_split = true;
1165 state->newitemonleft = newitemonleft;
1166 state->firstright = firstright;
1167 state->best_delta = delta;
1173 * _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1174 * we last looked at in the parent.
1176 * This is possible because we save a bit image of the last item
1177 * we looked at in the parent, and the update algorithm guarantees
1178 * that if items above us in the tree move, they only move right.
1180 * Also, re-set bts_blkno & bts_offset if changed.
1183 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1193 BTPageOpaque opaque;
1195 blkno = stack->bts_blkno;
1196 buf = _bt_getbuf(rel, blkno, access);
1197 page = BufferGetPage(buf);
1198 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1199 maxoff = PageGetMaxOffsetNumber(page);
1201 start = stack->bts_offset;
1204 * _bt_insertonpg set bts_offset to InvalidOffsetNumber in the case of
1205 * concurrent ROOT page split. Also, watch out for possibility that
1206 * page has a high key now when it didn't before.
1208 if (start < P_FIRSTDATAKEY(opaque))
1209 start = P_FIRSTDATAKEY(opaque);
1213 /* see if it's on this page */
1214 for (offnum = start;
1216 offnum = OffsetNumberNext(offnum))
1218 itemid = PageGetItemId(page, offnum);
1219 item = (BTItem) PageGetItem(page, itemid);
1220 if (BTItemSame(item, &stack->bts_btitem))
1222 /* Return accurate pointer to where link is now */
1223 stack->bts_blkno = blkno;
1224 stack->bts_offset = offnum;
1230 * by here, the item we're looking for moved right at least one
1233 if (P_RIGHTMOST(opaque))
1235 _bt_relbuf(rel, buf);
1236 return (InvalidBuffer);
1239 blkno = opaque->btpo_next;
1240 _bt_relbuf(rel, buf);
1241 buf = _bt_getbuf(rel, blkno, access);
1242 page = BufferGetPage(buf);
1243 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1244 maxoff = PageGetMaxOffsetNumber(page);
1245 start = P_FIRSTDATAKEY(opaque);
1250 * _bt_newroot() -- Create a new root page for the index.
1252 * We've just split the old root page and need to create a new one.
1253 * In order to do this, we add a new root page to the file, then lock
1254 * the metadata page and update it. This is guaranteed to be deadlock-
1255 * free, because all readers release their locks on the metadata page
1256 * before trying to lock the root, and all writers lock the root before
1257 * trying to lock the metadata page. We have a write lock on the old
1258 * root page, so we have not introduced any cycles into the waits-for
1261 * On entry, lbuf (the old root) and rbuf (its new peer) are write-
1262 * locked. On exit, a new root page exists with entries for the
1263 * two new children, metapage is updated and unlocked/unpinned.
1264 * The new root buffer is returned to caller which has to unlock/unpin
1265 * lbuf, rbuf & rootbuf.
1268 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1276 BlockNumber rootblknum;
1277 BTPageOpaque rootopaque;
1284 BTMetaPageData *metad;
1286 /* get a new root page */
1287 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1288 rootpage = BufferGetPage(rootbuf);
1289 rootblknum = BufferGetBlockNumber(rootbuf);
1290 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1291 metapg = BufferGetPage(metabuf);
1292 metad = BTPageGetMeta(metapg);
1294 /* NO ELOG(ERROR) from here till newroot op is logged */
1295 START_CRIT_SECTION();
1297 /* set btree special data */
1298 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1299 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1300 rootopaque->btpo_flags |= BTP_ROOT;
1301 rootopaque->btpo_parent = BTREE_METAPAGE;
1303 lbkno = BufferGetBlockNumber(lbuf);
1304 rbkno = BufferGetBlockNumber(rbuf);
1305 lpage = BufferGetPage(lbuf);
1306 rpage = BufferGetPage(rbuf);
1309 * Make sure pages in old root level have valid parent links --- we
1310 * will need this in _bt_insertonpg() if a concurrent root split
1311 * happens (see README).
1313 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent =
1314 ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent =
1318 * Create downlink item for left page (old root). Since this will be
1319 * the first item in a non-leaf page, it implicitly has minus-infinity
1320 * key value, so we need not store any actual key in it.
1322 itemsz = sizeof(BTItemData);
1323 new_item = (BTItem) palloc(itemsz);
1324 new_item->bti_itup.t_info = itemsz;
1325 ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
1328 * Insert the left page pointer into the new root page. The root page
1329 * is the rightmost page on its level so there is no "high key" in it;
1330 * the two items will go into positions P_HIKEY and P_FIRSTKEY.
1332 if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1333 elog(PANIC, "btree: failed to add leftkey to new root page");
1337 * Create downlink item for right page. The key for it is obtained
1338 * from the "high key" position in the left page.
1340 itemid = PageGetItemId(lpage, P_HIKEY);
1341 itemsz = ItemIdGetLength(itemid);
1342 item = (BTItem) PageGetItem(lpage, itemid);
1343 new_item = _bt_formitem(&(item->bti_itup));
1344 ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
1347 * insert the right page pointer into the new root page.
1349 if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1350 elog(PANIC, "btree: failed to add rightkey to new root page");
1353 metad->btm_root = rootblknum;
1354 (metad->btm_level)++;
1358 xl_btree_newroot xlrec;
1360 XLogRecData rdata[2];
1362 xlrec.node = rel->rd_node;
1363 xlrec.level = metad->btm_level;
1364 BlockIdSet(&(xlrec.rootblk), rootblknum);
1365 rdata[0].buffer = InvalidBuffer;
1366 rdata[0].data = (char *) &xlrec;
1367 rdata[0].len = SizeOfBtreeNewroot;
1368 rdata[0].next = &(rdata[1]);
1371 * Dirrect access to page is not good but faster - we should
1372 * implement some new func in page API.
1374 rdata[1].buffer = InvalidBuffer;
1375 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1376 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1377 ((PageHeader) rootpage)->pd_upper;
1378 rdata[1].next = NULL;
1380 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1382 PageSetLSN(rootpage, recptr);
1383 PageSetSUI(rootpage, ThisStartUpID);
1384 PageSetLSN(metapg, recptr);
1385 PageSetSUI(metapg, ThisStartUpID);
1387 /* we changed their btpo_parent */
1388 PageSetLSN(lpage, recptr);
1389 PageSetSUI(lpage, ThisStartUpID);
1390 PageSetLSN(rpage, recptr);
1391 PageSetSUI(rpage, ThisStartUpID);
1395 /* write and let go of metapage buffer */
1396 _bt_wrtbuf(rel, metabuf);
1402 * In the event old root page was splitted but no new one was created we
1403 * build required parent levels keeping write lock on old root page.
1404 * Note: it's assumed that old root page' btpo_parent points to meta page,
1405 * ie not to parent page. On exit, new root page buffer is write locked.
1406 * If "release" is TRUE then oldrootbuf will be released immediately
1407 * after upper level is builded.
1410 _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release)
1413 BlockNumber rootblk;
1416 Page oldrootpage = BufferGetPage(oldrootbuf);
1417 BTPageOpaque oldrootopaque = (BTPageOpaque)
1418 PageGetSpecialPointer(oldrootpage);
1425 BTPageOpaque opaque,
1428 OffsetNumber newitemoff;
1433 if (!P_LEFTMOST(oldrootopaque) || P_RIGHTMOST(oldrootopaque))
1434 elog(ERROR, "bt_fixroot: not valid old root page");
1436 /* Read right neighbor and create new root page */
1437 leftbuf = _bt_getbuf(rel, oldrootopaque->btpo_next, BT_WRITE);
1438 leftpage = BufferGetPage(leftbuf);
1439 leftopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
1440 rootbuf = _bt_newroot(rel, oldrootbuf, leftbuf);
1441 rootpage = BufferGetPage(rootbuf);
1442 rootLSN = PageGetLSN(rootpage);
1443 rootblk = BufferGetBlockNumber(rootbuf);
1445 /* parent page where to insert pointers */
1447 page = BufferGetPage(buf);
1448 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1451 * Now read other pages (if any) on level and add them to new root.
1452 * Here we break one of our locking rules - never hold lock on parent
1453 * page when acquiring lock on its child, - but we free from deadlock:
1455 * If concurrent process will split one of pages on this level then it
1456 * will see either btpo_parent == metablock or btpo_parent == rootblk.
1457 * In first case it will give up its locks and walk to the leftmost
1458 * page (oldrootbuf) in _bt_fixup() - ie it will wait for us and let
1459 * us continue. In second case it will try to lock rootbuf keeping its
1460 * locks on buffers we already passed, also waiting for us. If we'll
1461 * have to unlock rootbuf (split it) and that process will have to
1462 * split page of new level we created (level of rootbuf) then it will
1463 * wait while we create upper level. Etc.
1465 while (!P_RIGHTMOST(leftopaque))
1467 rightbuf = _bt_getbuf(rel, leftopaque->btpo_next, BT_WRITE);
1468 rightpage = BufferGetPage(rightbuf);
1469 rightopaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
1472 * Update LSN & StartUpID of child page buffer to ensure that it
1473 * will be written on disk after flushing log record for new root
1474 * creation. Unfortunately, for the moment (?) we do not log this
1475 * operation and so possibly break our rule to log entire page
1476 * content on first after checkpoint modification.
1479 rightopaque->btpo_parent = rootblk;
1480 if (XLByteLT(PageGetLSN(rightpage), rootLSN))
1481 PageSetLSN(rightpage, rootLSN);
1482 PageSetSUI(rightpage, ThisStartUpID);
1483 RESUME_INTERRUPTS();
1485 ritem = (BTItem) PageGetItem(leftpage, PageGetItemId(leftpage, P_HIKEY));
1486 btitem = _bt_formitem(&(ritem->bti_itup));
1487 ItemPointerSet(&(btitem->bti_itup.t_tid), leftopaque->btpo_next, P_HIKEY);
1488 itemsz = IndexTupleDSize(btitem->bti_itup)
1489 + (sizeof(BTItemData) - sizeof(IndexTupleData));
1490 itemsz = MAXALIGN(itemsz);
1492 newitemoff = OffsetNumberNext(PageGetMaxOffsetNumber(page));
1494 if (PageGetFreeSpace(page) < itemsz)
1497 OffsetNumber firstright;
1498 OffsetNumber itup_off;
1499 BlockNumber itup_blkno;
1502 firstright = _bt_findsplitloc(rel, page,
1503 newitemoff, itemsz, &newitemonleft);
1504 newbuf = _bt_split(rel, buf, firstright,
1505 newitemoff, itemsz, btitem, newitemonleft,
1506 &itup_off, &itup_blkno);
1507 /* Keep lock on new "root" buffer ! */
1509 _bt_relbuf(rel, buf);
1511 page = BufferGetPage(buf);
1512 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1515 _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
1517 /* give up left buffer */
1518 _bt_wrtbuf(rel, leftbuf);
1521 leftpage = rightpage;
1522 leftopaque = rightopaque;
1525 /* give up rightmost page buffer */
1526 _bt_wrtbuf(rel, leftbuf);
1529 * Here we hold locks on old root buffer, new root buffer we've
1530 * created with _bt_newroot() - rootbuf, - and buf we've used for last
1531 * insert ops - buf. If rootbuf != buf then we have to create at least
1532 * one more level. And if "release" is TRUE then we give up
1536 _bt_wrtbuf(rel, oldrootbuf);
1540 _bt_wrtbuf(rel, buf);
1541 return (_bt_fixroot(rel, rootbuf, true));
1548 * Using blkno of leftmost page on a level inside tree this func
1549 * checks/fixes tree from this level up to the root page.
1552 _bt_fixtree(Relation rel, BlockNumber blkno)
1556 BTPageOpaque opaque;
1561 buf = _bt_getbuf(rel, blkno, BT_READ);
1562 page = BufferGetPage(buf);
1563 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1564 if (!P_LEFTMOST(opaque) || P_ISLEAF(opaque))
1565 elog(ERROR, "bt_fixtree[%s]: invalid start page (need to recreate index)", RelationGetRelationName(rel));
1566 pblkno = opaque->btpo_parent;
1568 /* check/fix entire level */
1569 _bt_fixlevel(rel, buf, InvalidBlockNumber);
1572 * No pins/locks are held here. Re-read start page if its
1573 * btpo_parent pointed to meta page else go up one level.
1575 * XXX have to catch InvalidBlockNumber at the moment -:(
1577 if (pblkno == BTREE_METAPAGE || pblkno == InvalidBlockNumber)
1579 buf = _bt_getbuf(rel, blkno, BT_WRITE);
1580 page = BufferGetPage(buf);
1581 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1582 if (P_ISROOT(opaque))
1584 /* Tree is Ok now */
1585 _bt_relbuf(rel, buf);
1588 /* Call _bt_fixroot() if there is no upper level */
1589 if (BTreeInvalidParent(opaque))
1591 elog(WARNING, "bt_fixtree[%s]: fixing root page", RelationGetRelationName(rel));
1592 buf = _bt_fixroot(rel, buf, true);
1593 _bt_relbuf(rel, buf);
1596 /* Have to go up one level */
1597 pblkno = opaque->btpo_parent;
1598 _bt_relbuf(rel, buf);
1606 * Check/fix level starting from page in buffer buf up to block
1607 * limit on *child* level (or till rightmost child page if limit
1608 * is InvalidBlockNumber). Start buffer must be read locked.
1609 * No pins/locks are held on exit.
1612 _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
1614 BlockNumber blkno = BufferGetBlockNumber(buf);
1616 BTPageOpaque opaque;
1617 BlockNumber cblkno[3];
1618 OffsetNumber coff[3];
1621 BTPageOpaque copaque[3];
1625 bool goodbye = false;
1628 page = BufferGetPage(buf);
1629 /* copy page to temp storage */
1630 memmove(tbuf, page, PageGetPageSize(page));
1631 _bt_relbuf(rel, buf);
1634 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1636 /* Initialize first child data */
1637 coff[0] = P_FIRSTDATAKEY(opaque);
1638 if (coff[0] > PageGetMaxOffsetNumber(page))
1639 elog(ERROR, "bt_fixlevel[%s]: invalid maxoff on start page (need to recreate index)", RelationGetRelationName(rel));
1640 btitem = (BTItem) PageGetItem(page, PageGetItemId(page, coff[0]));
1641 cblkno[0] = ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid));
1642 cbuf[0] = _bt_getbuf(rel, cblkno[0], BT_READ);
1643 cpage[0] = BufferGetPage(cbuf[0]);
1644 copaque[0] = (BTPageOpaque) PageGetSpecialPointer(cpage[0]);
1645 if (P_LEFTMOST(opaque) && !P_LEFTMOST(copaque[0]))
1646 elog(ERROR, "bt_fixtlevel[%s]: non-leftmost child page of leftmost parent (need to recreate index)", RelationGetRelationName(rel));
1647 /* caller should take care and avoid this */
1648 if (P_RIGHTMOST(copaque[0]))
1649 elog(ERROR, "bt_fixtlevel[%s]: invalid start child (need to recreate index)", RelationGetRelationName(rel));
1654 * Read up to 2 more child pages and look for pointers to them in
1655 * *saved* parent page
1657 coff[1] = coff[2] = InvalidOffsetNumber;
1658 for (cidx = 0; cidx < 2;)
1661 cblkno[cidx] = (copaque[cidx - 1])->btpo_next;
1662 cbuf[cidx] = _bt_getbuf(rel, cblkno[cidx], BT_READ);
1663 cpage[cidx] = BufferGetPage(cbuf[cidx]);
1664 copaque[cidx] = (BTPageOpaque) PageGetSpecialPointer(cpage[cidx]);
1665 coff[cidx] = _bt_getoff(page, cblkno[cidx]);
1668 if (coff[cidx] != InvalidOffsetNumber)
1670 for (i = cidx - 1; i >= 0; i--)
1672 if (coff[i] == InvalidOffsetNumber)
1674 if (coff[cidx] != coff[i] + 1)
1675 elog(ERROR, "bt_fixlevel[%s]: invalid item order(1) (need to recreate index)", RelationGetRelationName(rel));
1680 if (P_RIGHTMOST(copaque[cidx]))
1685 * Read parent page and insert missed pointers.
1687 if (coff[1] == InvalidOffsetNumber ||
1688 (cidx == 2 && coff[2] == InvalidOffsetNumber))
1692 BTPageOpaque newopaque;
1695 OffsetNumber newitemoff;
1696 BlockNumber parblk[3];
1699 stack.bts_parent = NULL;
1700 stack.bts_blkno = blkno;
1701 stack.bts_offset = InvalidOffsetNumber;
1702 ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid),
1703 cblkno[0], P_HIKEY);
1705 buf = _bt_getstackbuf(rel, &stack, BT_WRITE);
1706 if (buf == InvalidBuffer)
1707 elog(ERROR, "bt_fixlevel[%s]: pointer disappeared (need to recreate index)", RelationGetRelationName(rel));
1709 page = BufferGetPage(buf);
1710 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1711 coff[0] = stack.bts_offset;
1712 blkno = BufferGetBlockNumber(buf);
1715 /* Check/insert missed pointers */
1716 for (i = 1; i <= cidx; i++)
1718 coff[i] = _bt_getoff(page, cblkno[i]);
1721 parblk[i] = BufferGetBlockNumber(buf);
1722 if (coff[i] != InvalidOffsetNumber)
1724 if (parblk[i] == parblk[i - 1] &&
1725 coff[i] != coff[i - 1] + 1)
1726 elog(ERROR, "bt_fixlevel[%s]: invalid item order(2) (need to recreate index)", RelationGetRelationName(rel));
1729 /* Have to check next page ? */
1730 if ((!P_RIGHTMOST(opaque)) &&
1731 coff[i - 1] == PageGetMaxOffsetNumber(page)) /* yes */
1733 newbuf = _bt_getbuf(rel, opaque->btpo_next, BT_WRITE);
1734 newpage = BufferGetPage(newbuf);
1735 newopaque = (BTPageOpaque) PageGetSpecialPointer(newpage);
1736 coff[i] = _bt_getoff(newpage, cblkno[i]);
1737 if (coff[i] != InvalidOffsetNumber) /* found ! */
1739 if (coff[i] != P_FIRSTDATAKEY(newopaque))
1740 elog(ERROR, "bt_fixlevel[%s]: invalid item order(3) (need to recreate index)", RelationGetRelationName(rel));
1741 _bt_relbuf(rel, buf);
1745 blkno = BufferGetBlockNumber(buf);
1749 /* unfound - need to insert on current page */
1750 _bt_relbuf(rel, newbuf);
1752 /* insert pointer */
1753 ritem = (BTItem) PageGetItem(cpage[i - 1],
1754 PageGetItemId(cpage[i - 1], P_HIKEY));
1755 btitem = _bt_formitem(&(ritem->bti_itup));
1756 ItemPointerSet(&(btitem->bti_itup.t_tid), cblkno[i], P_HIKEY);
1757 itemsz = IndexTupleDSize(btitem->bti_itup)
1758 + (sizeof(BTItemData) - sizeof(IndexTupleData));
1759 itemsz = MAXALIGN(itemsz);
1761 newitemoff = coff[i - 1] + 1;
1763 if (PageGetFreeSpace(page) < itemsz)
1765 OffsetNumber firstright;
1766 OffsetNumber itup_off;
1767 BlockNumber itup_blkno;
1770 firstright = _bt_findsplitloc(rel, page,
1771 newitemoff, itemsz, &newitemonleft);
1772 newbuf = _bt_split(rel, buf, firstright,
1773 newitemoff, itemsz, btitem, newitemonleft,
1774 &itup_off, &itup_blkno);
1775 /* what buffer we need in ? */
1777 _bt_relbuf(rel, newbuf);
1780 _bt_relbuf(rel, buf);
1782 page = BufferGetPage(buf);
1783 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1785 blkno = BufferGetBlockNumber(buf);
1790 _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
1791 coff[i] = newitemoff;
1798 /* copy page with pointer to cblkno[cidx] to temp storage */
1799 memmove(tbuf, page, PageGetPageSize(page));
1800 _bt_relbuf(rel, buf);
1802 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1805 /* Continue if current check/fix level page is rightmost */
1806 if (P_RIGHTMOST(opaque))
1809 /* Pointers to child pages are Ok - right end of child level ? */
1810 _bt_relbuf(rel, cbuf[0]);
1811 _bt_relbuf(rel, cbuf[1]);
1813 (cidx == 2 && (P_RIGHTMOST(copaque[2]) || goodbye)))
1816 _bt_relbuf(rel, cbuf[2]);
1819 if (cblkno[0] == limit || cblkno[1] == limit)
1821 cblkno[0] = cblkno[2];
1823 cpage[0] = cpage[2];
1824 copaque[0] = copaque[2];
1830 * Check/fix part of tree - branch - up from parent of level with blocks
1831 * lblkno and rblknum. We first ensure that parent level has pointers
1832 * to both lblkno & rblknum and if those pointers are on different
1833 * parent pages then do the same for parent level, etc. No locks must
1834 * be held on target level and upper on entry. No locks will be held
1835 * on exit. Stack created when traversing tree down should be provided and
1836 * it must points to parent level. rblkno must be on the right from lblkno.
1837 * (This function is special edition of more expensive _bt_fixtree(),
1838 * but it doesn't guarantee full consistency of tree.)
1841 _bt_fixbranch(Relation rel, BlockNumber lblkno,
1842 BlockNumber rblkno, BTStack true_stack)
1844 BlockNumber blkno = true_stack->bts_blkno;
1846 BTPageOpaque opaque;
1850 OffsetNumber offnum;
1852 true_stack = true_stack->bts_parent;
1855 buf = _bt_getbuf(rel, blkno, BT_READ);
1857 /* Check/fix parent level pointed by blkno */
1858 _bt_fixlevel(rel, buf, rblkno);
1861 * Here parent level should have pointers for both lblkno and
1862 * rblkno and we have to find them.
1864 stack.bts_parent = NULL;
1865 stack.bts_blkno = blkno;
1866 stack.bts_offset = InvalidOffsetNumber;
1867 ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), lblkno, P_HIKEY);
1868 buf = _bt_getstackbuf(rel, &stack, BT_READ);
1869 if (buf == InvalidBuffer)
1870 elog(ERROR, "bt_fixbranch[%s]: left pointer unfound (need to recreate index)", RelationGetRelationName(rel));
1871 page = BufferGetPage(buf);
1872 offnum = _bt_getoff(page, rblkno);
1874 if (offnum != InvalidOffsetNumber) /* right pointer found */
1876 if (offnum <= stack.bts_offset)
1877 elog(ERROR, "bt_fixbranch[%s]: invalid item order (need to recreate index)", RelationGetRelationName(rel));
1878 _bt_relbuf(rel, buf);
1882 /* Pointers are on different parent pages - find right one */
1883 lblkno = BufferGetBlockNumber(buf);
1884 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1885 if (P_RIGHTMOST(opaque))
1886 elog(ERROR, "bt_fixbranch[%s]: right pointer unfound(1) (need to recreate index)", RelationGetRelationName(rel));
1888 stack.bts_parent = NULL;
1889 stack.bts_blkno = opaque->btpo_next;
1890 stack.bts_offset = InvalidOffsetNumber;
1891 ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), rblkno, P_HIKEY);
1892 rbuf = _bt_getstackbuf(rel, &stack, BT_READ);
1893 if (rbuf == InvalidBuffer)
1894 elog(ERROR, "bt_fixbranch[%s]: right pointer unfound(2) (need to recreate index)", RelationGetRelationName(rel));
1895 rblkno = BufferGetBlockNumber(rbuf);
1896 _bt_relbuf(rel, rbuf);
1899 * If we have parent item in true_stack then go up one level and
1900 * ensure that it has pointers to new lblkno & rblkno.
1904 _bt_relbuf(rel, buf);
1905 blkno = true_stack->bts_blkno;
1906 true_stack = true_stack->bts_parent;
1911 * Well, we are on the level that was root or unexistent when we
1912 * started traversing tree down. If btpo_parent is updated then
1913 * we'll use it to continue, else we'll fix/restore upper levels
1916 if (!BTreeInvalidParent(opaque))
1918 blkno = opaque->btpo_parent;
1919 _bt_relbuf(rel, buf);
1923 /* Have to switch to excl buf lock and re-check btpo_parent */
1924 _bt_relbuf(rel, buf);
1925 buf = _bt_getbuf(rel, blkno, BT_WRITE);
1926 page = BufferGetPage(buf);
1927 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1928 if (!BTreeInvalidParent(opaque))
1930 blkno = opaque->btpo_parent;
1931 _bt_relbuf(rel, buf);
1936 * We hold excl lock on some internal page with unupdated
1937 * btpo_parent - time for _bt_fixup.
1942 elog(WARNING, "bt_fixbranch[%s]: fixing upper levels", RelationGetRelationName(rel));
1943 _bt_fixup(rel, buf);
1949 * Having buf excl locked this routine walks to the left on level and
1950 * uses either _bt_fixtree() or _bt_fixroot() to create/check&fix upper
1951 * levels. No buffer pins/locks will be held on exit.
1954 _bt_fixup(Relation rel, Buffer buf)
1957 BTPageOpaque opaque;
1962 page = BufferGetPage(buf);
1963 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1966 * If someone else already created parent pages then it's time for
1967 * _bt_fixtree() to check upper levels and fix them, if required.
1969 if (!BTreeInvalidParent(opaque))
1971 blkno = opaque->btpo_parent;
1972 _bt_relbuf(rel, buf);
1973 elog(WARNING, "bt_fixup[%s]: checking/fixing upper levels", RelationGetRelationName(rel));
1974 _bt_fixtree(rel, blkno);
1977 if (P_LEFTMOST(opaque))
1979 blkno = opaque->btpo_prev;
1980 _bt_relbuf(rel, buf);
1981 buf = _bt_getbuf(rel, blkno, BT_WRITE);
1985 * Ok, we are on the leftmost page, it's write locked by us and its
1986 * btpo_parent points to meta page - time for _bt_fixroot().
1988 elog(WARNING, "bt_fixup[%s]: fixing root page", RelationGetRelationName(rel));
1989 buf = _bt_fixroot(rel, buf, true);
1990 _bt_relbuf(rel, buf);
1994 _bt_getoff(Page page, BlockNumber blkno)
1996 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1997 OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
1998 OffsetNumber offnum = P_FIRSTDATAKEY(opaque);
1999 BlockNumber curblkno;
2003 for (; offnum <= maxoff; offnum++)
2005 itemid = PageGetItemId(page, offnum);
2006 item = (BTItem) PageGetItem(page, itemid);
2007 curblkno = ItemPointerGetBlockNumber(&(item->bti_itup.t_tid));
2008 if (curblkno == blkno)
2012 return (InvalidOffsetNumber);
2016 * _bt_pgaddtup() -- add a tuple to a particular page in the index.
2018 * This routine adds the tuple to the page as requested. It does
2019 * not affect pin/lock status, but you'd better have a write lock
2020 * and pin on the target buffer! Don't forget to write and release
2021 * the buffer afterwards, either.
2023 * The main difference between this routine and a bare PageAddItem call
2024 * is that this code knows that the leftmost data item on a non-leaf
2025 * btree page doesn't need to have a key. Therefore, it strips such
2026 * items down to just the item header. CAUTION: this works ONLY if
2027 * we insert the items in order, so that the given itup_off does
2028 * represent the final position of the item!
2031 _bt_pgaddtup(Relation rel,
2035 OffsetNumber itup_off,
2038 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2039 BTItemData truncitem;
2041 if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
2043 memcpy(&truncitem, btitem, sizeof(BTItemData));
2044 truncitem.bti_itup.t_info = sizeof(BTItemData);
2045 btitem = &truncitem;
2046 itemsize = sizeof(BTItemData);
2049 if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
2050 LP_USED) == InvalidOffsetNumber)
2051 elog(PANIC, "btree: failed to add item to the %s for %s",
2052 where, RelationGetRelationName(rel));
2056 * _bt_isequal - used in _bt_doinsert in check for duplicates.
2058 * This is very similar to _bt_compare, except for NULL handling.
2059 * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too.
2062 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
2063 int keysz, ScanKey scankey)
2069 /* Better be comparing to a leaf item */
2070 Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
2072 btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
2073 itup = &(btitem->bti_itup);
2075 for (i = 1; i <= keysz; i++)
2077 ScanKey entry = &scankey[i - 1];
2083 attno = entry->sk_attno;
2085 datum = index_getattr(itup, attno, itupdesc, &isNull);
2087 /* NULLs are never equal to anything */
2088 if (entry->sk_flags & SK_ISNULL || isNull)
2091 result = DatumGetInt32(FunctionCall2(&entry->sk_func,
2099 /* if we get here, the keys are equal */