1 /*-------------------------------------------------------------------------
4 * Item insertion in Lehman and Yao btrees for Postgres.
6 * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.67 2000/10/21 15:43:18 vadim Exp $
13 *-------------------------------------------------------------------------
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
24 /* context data for _bt_checksplitloc */
25 Size newitemsz; /* size of new item to be inserted */
26 bool non_leaf; /* T if splitting an internal node */
28 bool have_split; /* found a valid split? */
30 /* these fields valid only if have_split is true */
31 bool newitemonleft; /* new item on left or right of best split */
32 OffsetNumber firstright; /* best split point */
33 int best_delta; /* best size delta so far */
36 void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
38 static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
39 Relation heapRel, Buffer buf,
40 ScanKey itup_scankey);
41 static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
43 int keysz, ScanKey scankey,
45 OffsetNumber afteritem);
46 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
47 OffsetNumber newitemoff, Size newitemsz,
48 BTItem newitem, bool newitemonleft,
49 OffsetNumber *itup_off, BlockNumber *itup_blkno);
50 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
51 OffsetNumber newitemoff,
54 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
55 int leftfree, int rightfree,
56 bool newitemonleft, Size firstrightitemsz);
57 static Buffer _bt_getstackbuf(Relation rel, BTStack stack);
58 static void _bt_pgaddtup(Relation rel, Page page,
59 Size itemsize, BTItem btitem,
60 OffsetNumber itup_off, const char *where);
61 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
62 int keysz, ScanKey scankey);
65 static Relation _xlheapRel; /* temporary hack */
69 * _bt_doinsert() -- Handle insertion of a single btitem in the tree.
71 * This routine is called by the public interface routines, btbuild
72 * and btinsert. By here, btitem is filled in, including the TID.
75 _bt_doinsert(Relation rel, BTItem btitem,
76 bool index_is_unique, Relation heapRel)
78 IndexTuple itup = &(btitem->bti_itup);
79 int natts = rel->rd_rel->relnatts;
83 InsertIndexResult res;
85 /* we need a scan key to do our search, so build one */
86 itup_scankey = _bt_mkscankey(rel, itup);
89 /* find the page containing this key */
90 stack = _bt_search(rel, natts, itup_scankey, &buf, BT_WRITE);
92 /* trade in our read lock for a write lock */
93 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
94 LockBuffer(buf, BT_WRITE);
97 * If the page was split between the time that we surrendered our read
98 * lock and acquired our write lock, then this page may no longer be
99 * the right place for the key we want to insert. In this case, we
100 * need to move right in the tree. See Lehman and Yao for an
101 * excruciatingly precise description.
103 buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE);
106 * If we're not allowing duplicates, make sure the key isn't
107 * already in the index. XXX this belongs somewhere else, likely
113 xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
115 if (TransactionIdIsValid(xwait))
117 /* Have to wait for the other guy ... */
118 _bt_relbuf(rel, buf, BT_WRITE);
119 XactLockTableWait(xwait);
121 _bt_freestack(stack);
127 _xlheapRel = heapRel; /* temporary hack */
130 /* do the insertion */
131 res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0);
134 _bt_freestack(stack);
135 _bt_freeskey(itup_scankey);
141 * _bt_check_unique() -- Check for violation of unique index constraint
143 * Returns NullTransactionId if there is no conflict, else an xact ID we
144 * must wait for to see if it commits a conflicting tuple. If an actual
145 * conflict is detected, no return --- just elog().
148 _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
149 Buffer buf, ScanKey itup_scankey)
151 TupleDesc itupdesc = RelationGetDescr(rel);
152 int natts = rel->rd_rel->relnatts;
157 Buffer nbuf = InvalidBuffer;
160 page = BufferGetPage(buf);
161 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
162 maxoff = PageGetMaxOffsetNumber(page);
165 * Find first item >= proposed new item. Note we could also get
166 * a pointer to end-of-page here.
168 offset = _bt_binsrch(rel, buf, natts, itup_scankey);
171 * Scan over all equal tuples, looking for live conflicts.
181 * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
182 * how we handling NULLs - and so we must not use _bt_compare
183 * in real comparison, but only for ordering/finding items on
184 * pages. - vadim 03/24/97
186 * make sure the offset points to an actual key
187 * before trying to compare it...
189 if (offset <= maxoff)
191 if (! _bt_isequal(itupdesc, page, offset, natts, itup_scankey))
192 break; /* we're past all the equal tuples */
195 * Have to check is inserted heap tuple deleted one (i.e.
196 * just moved to another place by vacuum)! We only need to
197 * do this once, but don't want to do it at all unless
198 * we see equal tuples, so as not to slow down unequal case.
202 htup.t_self = btitem->bti_itup.t_tid;
203 heap_fetch(heapRel, SnapshotDirty, &htup, &buffer);
204 if (htup.t_data == NULL) /* YES! */
206 /* Live tuple is being inserted, so continue checking */
207 ReleaseBuffer(buffer);
211 cbti = (BTItem) PageGetItem(page, PageGetItemId(page, offset));
212 htup.t_self = cbti->bti_itup.t_tid;
213 heap_fetch(heapRel, SnapshotDirty, &htup, &buffer);
214 if (htup.t_data != NULL) /* it is a duplicate */
216 TransactionId xwait =
217 (TransactionIdIsValid(SnapshotDirty->xmin)) ?
218 SnapshotDirty->xmin : SnapshotDirty->xmax;
221 * If this tuple is being updated by other transaction
222 * then we have to wait for its commit/abort.
224 ReleaseBuffer(buffer);
225 if (TransactionIdIsValid(xwait))
227 if (nbuf != InvalidBuffer)
228 _bt_relbuf(rel, nbuf, BT_READ);
229 /* Tell _bt_doinsert to wait... */
233 * Otherwise we have a definite conflict.
235 elog(ERROR, "Cannot insert a duplicate key into unique index %s",
236 RelationGetRelationName(rel));
238 /* htup null so no buffer to release */
242 * Advance to next tuple to continue checking.
245 offset = OffsetNumberNext(offset);
248 /* If scankey == hikey we gotta check the next page too */
249 if (P_RIGHTMOST(opaque))
251 if (!_bt_isequal(itupdesc, page, P_HIKEY,
252 natts, itup_scankey))
254 nblkno = opaque->btpo_next;
255 if (nbuf != InvalidBuffer)
256 _bt_relbuf(rel, nbuf, BT_READ);
257 nbuf = _bt_getbuf(rel, nblkno, BT_READ);
258 page = BufferGetPage(nbuf);
259 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
260 maxoff = PageGetMaxOffsetNumber(page);
261 offset = P_FIRSTDATAKEY(opaque);
265 if (nbuf != InvalidBuffer)
266 _bt_relbuf(rel, nbuf, BT_READ);
268 return NullTransactionId;
272 * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
274 * This recursive procedure does the following things:
276 * + finds the right place to insert the tuple.
277 * + if necessary, splits the target page (making sure that the
278 * split is equitable as far as post-insert free space goes).
279 * + inserts the tuple.
280 * + if the page was split, pops the parent stack, and finds the
281 * right place to insert the new child pointer (by walking
282 * right using information stored in the parent stack).
283 * + invokes itself with the appropriate tuple for the right
284 * child page on the parent.
286 * On entry, we must have the right buffer on which to do the
287 * insertion, and the buffer must be pinned and locked. On return,
288 * we will have dropped both the pin and the write lock on the buffer.
290 * If 'afteritem' is >0 then the new tuple must be inserted after the
291 * existing item of that number, noplace else. If 'afteritem' is 0
292 * then the procedure finds the exact spot to insert it by searching.
293 * (keysz and scankey parameters are used ONLY if afteritem == 0.)
295 * NOTE: if the new key is equal to one or more existing keys, we can
296 * legitimately place it anywhere in the series of equal keys --- in fact,
297 * if the new key is equal to the page's "high key" we can place it on
298 * the next page. If it is equal to the high key, and there's not room
299 * to insert the new tuple on the current page without splitting, then
300 * we can move right hoping to find more free space and avoid a split.
301 * (We should not move right indefinitely, however, since that leads to
302 * O(N^2) insertion behavior in the presence of many equal keys.)
303 * Once we have chosen the page to put the key on, we'll insert it before
304 * any existing equal keys because of the way _bt_binsrch() works.
306 * The locking interactions in this code are critical. You should
307 * grok Lehman and Yao's paper before making any changes. In addition,
308 * you need to understand how we disambiguate duplicate keys in this
309 * implementation, in order to be able to find our location using
310 * L&Y "move right" operations. Since we may insert duplicate user
311 * keys, and since these dups may propagate up the tree, we use the
312 * 'afteritem' parameter to position ourselves correctly for the
313 * insertion on internal pages.
316 static InsertIndexResult
317 _bt_insertonpg(Relation rel,
323 OffsetNumber afteritem)
325 InsertIndexResult res;
327 BTPageOpaque lpageop;
328 OffsetNumber itup_off;
329 BlockNumber itup_blkno;
330 OffsetNumber newitemoff;
331 OffsetNumber firstright = InvalidOffsetNumber;
334 page = BufferGetPage(buf);
335 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
337 itemsz = IndexTupleDSize(btitem->bti_itup)
338 + (sizeof(BTItemData) - sizeof(IndexTupleData));
340 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but
341 * we need to be consistent */
344 * Check whether the item can fit on a btree page at all. (Eventually,
345 * we ought to try to apply TOAST methods if not.) We actually need to
346 * be able to fit three items on every page, so restrict any one item
347 * to 1/3 the per-page available space. Note that at this point,
348 * itemsz doesn't include the ItemId.
350 if (itemsz > (PageGetPageSize(page) - sizeof(PageHeaderData) - MAXALIGN(sizeof(BTPageOpaqueData))) / 3 - sizeof(ItemIdData))
351 elog(ERROR, "btree: index item size %u exceeds maximum %lu",
353 (PageGetPageSize(page) - sizeof(PageHeaderData) - MAXALIGN(sizeof(BTPageOpaqueData))) /3 - sizeof(ItemIdData));
356 * Determine exactly where new item will go.
360 newitemoff = afteritem + 1;
365 * If we will need to split the page to put the item here,
366 * check whether we can put the tuple somewhere to the right,
367 * instead. Keep scanning right until we
368 * (a) find a page with enough free space,
369 * (b) reach the last page where the tuple can legally go, or
370 * (c) get tired of searching.
371 * (c) is not flippant; it is important because if there are many
372 * pages' worth of equal keys, it's better to split one of the early
373 * pages than to scan all the way to the end of the run of equal keys
374 * on every insert. We implement "get tired" as a random choice,
375 * since stopping after scanning a fixed number of pages wouldn't work
376 * well (we'd never reach the right-hand side of previously split
377 * pages). Currently the probability of moving right is set at 0.99,
378 * which may seem too high to change the behavior much, but it does an
379 * excellent job of preventing O(N^2) behavior with many equal keys.
382 bool movedright = false;
384 while (PageGetFreeSpace(page) < itemsz &&
385 !P_RIGHTMOST(lpageop) &&
386 _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
387 random() > (MAX_RANDOM_VALUE / 100))
389 /* step right one page */
390 BlockNumber rblkno = lpageop->btpo_next;
392 _bt_relbuf(rel, buf, BT_WRITE);
393 buf = _bt_getbuf(rel, rblkno, BT_WRITE);
394 page = BufferGetPage(buf);
395 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
399 * Now we are on the right page, so find the insert position.
400 * If we moved right at all, we know we should insert at the
401 * start of the page, else must find the position by searching.
404 newitemoff = P_FIRSTDATAKEY(lpageop);
406 newitemoff = _bt_binsrch(rel, buf, keysz, scankey);
410 * Do we need to split the page to fit the item on it?
412 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its
413 * result, so this comparison is correct even though we appear to
414 * be accounting only for the item and not for its line pointer.
416 if (PageGetFreeSpace(page) < itemsz)
419 BlockNumber bknum = BufferGetBlockNumber(buf);
421 bool is_root = P_ISROOT(lpageop);
424 /* Choose the split point */
425 firstright = _bt_findsplitloc(rel, page,
429 /* split the buffer into left and right halves */
430 rbuf = _bt_split(rel, buf, firstright,
431 newitemoff, itemsz, btitem, newitemonleft,
432 &itup_off, &itup_blkno);
437 * + our target page has been split;
438 * + the original tuple has been inserted;
439 * + we have write locks on both the old (left half)
440 * and new (right half) buffers, after the split; and
441 * + we know the key we want to insert into the parent
442 * (it's the "high key" on the left child page).
444 * We're ready to do the parent insertion. We need to hold onto the
445 * locks for the child pages until we locate the parent, but we can
446 * release them before doing the actual insertion (see Lehman and Yao
447 * for the reasoning).
449 * Here we have to do something Lehman and Yao don't talk about:
450 * deal with a root split and construction of a new root. If our
451 * stack is empty then we have just split a node on what had been
452 * the root level when we descended the tree. If it is still the
453 * root then we perform a new-root construction. If it *wasn't*
454 * the root anymore, use the parent pointer to get up to the root
455 * level that someone constructed meanwhile, and find the right
456 * place to insert as for the normal case.
462 Assert(stack == (BTStack) NULL);
463 /* create a new root node and release the split buffers */
464 _bt_newroot(rel, buf, rbuf);
468 InsertIndexResult newres;
470 BTStackData fakestack;
474 /* Set up a phony stack entry if we haven't got a real one */
475 if (stack == (BTStack) NULL)
477 elog(DEBUG, "btree: concurrent ROOT page split");
479 stack->bts_blkno = lpageop->btpo_parent;
480 stack->bts_offset = InvalidOffsetNumber;
481 /* bts_btitem will be initialized below */
482 stack->bts_parent = NULL;
485 /* get high key from left page == lowest key on new right page */
486 ritem = (BTItem) PageGetItem(page,
487 PageGetItemId(page, P_HIKEY));
489 /* form an index tuple that points at the new right page */
490 new_item = _bt_formitem(&(ritem->bti_itup));
491 rbknum = BufferGetBlockNumber(rbuf);
492 ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
495 * Find the parent buffer and get the parent page.
497 * Oops - if we were moved right then we need to change stack
498 * item! We want to find parent pointing to where we are,
499 * right ? - vadim 05/27/97
501 * Interestingly, this means we didn't *really* need to stack
502 * the parent key at all; all we really care about is the
503 * saved block and offset as a starting point for our search...
505 ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
508 pbuf = _bt_getstackbuf(rel, stack);
510 /* Now we can write and unlock the children */
511 _bt_wrtbuf(rel, rbuf);
512 _bt_wrtbuf(rel, buf);
514 /* Recursively update the parent */
515 newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
516 0, NULL, new_item, stack->bts_offset);
528 char xlbuf[sizeof(xl_btree_insert) +
529 sizeof(CommandId) + sizeof(RelFileNode)];
530 xl_btree_insert *xlrec = (xl_btree_insert*)xlbuf;
531 int hsize = SizeOfBtreeInsert;
532 BTItemData truncitem;
533 BTItem xlitem = btitem;
534 Size xlsize = IndexTupleDSize(btitem->bti_itup) +
535 (sizeof(BTItemData) - sizeof(IndexTupleData));
538 xlrec->target.node = rel->rd_node;
539 ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff);
540 if (P_ISLEAF(lpageop))
542 CommandId cid = GetCurrentCommandId();
543 memcpy(xlbuf + hsize, &cid, sizeof(CommandId));
544 hsize += sizeof(CommandId);
545 memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
546 hsize += sizeof(RelFileNode);
549 * Read comments in _bt_pgaddtup
551 else if (newitemoff == P_FIRSTDATAKEY(lpageop))
554 truncitem.bti_itup.t_info = sizeof(BTItemData);
556 xlsize = sizeof(BTItemData);
559 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT,
560 xlbuf, hsize, (char*) xlitem, xlsize);
562 PageSetLSN(page, recptr);
563 PageSetSUI(page, ThisStartUpID);
566 _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
567 itup_off = newitemoff;
568 itup_blkno = BufferGetBlockNumber(buf);
569 /* Write out the updated page and release pin/lock */
570 _bt_wrtbuf(rel, buf);
573 /* by here, the new tuple is inserted at itup_blkno/itup_off */
574 res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
575 ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
581 * _bt_split() -- split a page in the btree.
583 * On entry, buf is the page to split, and is write-locked and pinned.
584 * firstright is the item index of the first item to be moved to the
585 * new right page. newitemoff etc. tell us about the new item that
586 * must be inserted along with the data from the old page.
588 * Returns the new right sibling of buf, pinned and write-locked.
589 * The pin and lock on buf are maintained. *itup_off and *itup_blkno
590 * are set to the exact location where newitem was inserted.
593 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
594 OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
596 OffsetNumber *itup_off, BlockNumber *itup_blkno)
602 BTPageOpaque ropaque,
607 BTPageOpaque sopaque;
611 OffsetNumber leftoff,
620 rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
621 origpage = BufferGetPage(buf);
622 leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
623 rightpage = BufferGetPage(rbuf);
625 _bt_pageinit(leftpage, BufferGetPageSize(buf));
626 _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
628 /* init btree private data */
629 oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
630 lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
631 ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
633 /* if we're splitting this page, it won't be the root when we're done */
634 lopaque->btpo_flags = oopaque->btpo_flags;
635 lopaque->btpo_flags &= ~BTP_ROOT;
636 ropaque->btpo_flags = lopaque->btpo_flags;
637 lopaque->btpo_prev = oopaque->btpo_prev;
638 lopaque->btpo_next = BufferGetBlockNumber(rbuf);
639 ropaque->btpo_prev = BufferGetBlockNumber(buf);
640 ropaque->btpo_next = oopaque->btpo_next;
643 * Must copy the original parent link into both new pages, even though
644 * it might be quite obsolete by now. We might need it if this level
645 * is or recently was the root (see README).
647 lopaque->btpo_parent = ropaque->btpo_parent = oopaque->btpo_parent;
650 * If the page we're splitting is not the rightmost page at its level
651 * in the tree, then the first entry on the page is the high key
652 * for the page. We need to copy that to the right half. Otherwise
653 * (meaning the rightmost page case), all the items on the right half
658 if (!P_RIGHTMOST(oopaque))
660 itemid = PageGetItemId(origpage, P_HIKEY);
661 itemsz = ItemIdGetLength(itemid);
662 item = (BTItem) PageGetItem(origpage, itemid);
663 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
664 LP_USED) == InvalidOffsetNumber)
665 elog(STOP, "btree: failed to add hikey to the right sibling");
666 rightoff = OffsetNumberNext(rightoff);
670 * The "high key" for the new left page will be the first key that's
671 * going to go into the new right page. This might be either the
672 * existing data item at position firstright, or the incoming tuple.
675 if (!newitemonleft && newitemoff == firstright)
677 /* incoming tuple will become first on right page */
683 /* existing item at firstright will become first on right page */
684 itemid = PageGetItemId(origpage, firstright);
685 itemsz = ItemIdGetLength(itemid);
686 item = (BTItem) PageGetItem(origpage, itemid);
691 if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
692 LP_USED) == InvalidOffsetNumber)
693 elog(STOP, "btree: failed to add hikey to the left sibling");
694 leftoff = OffsetNumberNext(leftoff);
697 * Now transfer all the data items to the appropriate page
699 maxoff = PageGetMaxOffsetNumber(origpage);
701 for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
703 itemid = PageGetItemId(origpage, i);
704 itemsz = ItemIdGetLength(itemid);
705 item = (BTItem) PageGetItem(origpage, itemid);
707 /* does new item belong before this one? */
712 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
715 *itup_blkno = BufferGetBlockNumber(buf);
716 leftoff = OffsetNumberNext(leftoff);
720 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
722 *itup_off = rightoff;
723 *itup_blkno = BufferGetBlockNumber(rbuf);
724 rightoff = OffsetNumberNext(rightoff);
728 /* decide which page to put it on */
731 _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
733 leftoff = OffsetNumberNext(leftoff);
737 _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
739 rightoff = OffsetNumberNext(rightoff);
743 /* cope with possibility that newitem goes at the end */
748 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
751 *itup_blkno = BufferGetBlockNumber(buf);
752 leftoff = OffsetNumberNext(leftoff);
756 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
758 *itup_off = rightoff;
759 *itup_blkno = BufferGetBlockNumber(rbuf);
760 rightoff = OffsetNumberNext(rightoff);
765 * We have to grab the right sibling (if any) and fix the prev
766 * pointer there. We are guaranteed that this is deadlock-free
767 * since no other writer will be holding a lock on that page
768 * and trying to move left, and all readers release locks on a page
769 * before trying to fetch its neighbors.
772 if (!P_RIGHTMOST(ropaque))
774 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
775 spage = BufferGetPage(sbuf);
780 * Right sibling is locked, new siblings are prepared, but original
781 * page is not updated yet. Log changes before continuing.
783 * NO ELOG(ERROR) till right sibling is updated.
787 char xlbuf[sizeof(xl_btree_split) +
788 sizeof(CommandId) + sizeof(RelFileNode) + BLCKSZ];
789 xl_btree_split *xlrec = (xl_btree_split*) xlbuf;
790 int hsize = SizeOfBtreeSplit;
791 int flag = (newitemonleft) ?
792 XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT;
796 xlrec->target.node = rel->rd_node;
797 ItemPointerSet(&(xlrec->target.tid), *itup_blkno, *itup_off);
798 if (P_ISLEAF(lopaque))
800 CommandId cid = GetCurrentCommandId();
801 memcpy(xlbuf + hsize, &cid, sizeof(CommandId));
802 hsize += sizeof(CommandId);
803 memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
804 hsize += sizeof(RelFileNode);
808 Size itemsz = IndexTupleDSize(lhikey->bti_itup) +
809 (sizeof(BTItemData) - sizeof(IndexTupleData));
810 memcpy(xlbuf + hsize, (char*) lhikey, itemsz);
816 * Read comments in _bt_pgaddtup.
817 * Actually, seems that in non-leaf splits newitem shouldn't
818 * go to first data key position on left page.
820 if (! P_ISLEAF(lopaque) && *itup_off == P_FIRSTDATAKEY(lopaque))
822 BTItemData truncitem = *newitem;
823 truncitem.bti_itup.t_info = sizeof(BTItemData);
824 memcpy(xlbuf + hsize, &truncitem, sizeof(BTItemData));
825 hsize += sizeof(BTItemData);
829 Size itemsz = IndexTupleDSize(newitem->bti_itup) +
830 (sizeof(BTItemData) - sizeof(IndexTupleData));
831 memcpy(xlbuf + hsize, (char*) newitem, itemsz);
834 blkno = BufferGetBlockNumber(rbuf);
835 BlockIdSet(&(xlrec->otherblk), blkno);
839 blkno = BufferGetBlockNumber(buf);
840 BlockIdSet(&(xlrec->otherblk), blkno);
843 BlockIdSet(&(xlrec->rightblk), ropaque->btpo_next);
846 * Dirrect access to page is not good but faster - we should
847 * implement some new func in page API.
849 recptr = XLogInsert(RM_BTREE_ID, flag, xlbuf,
850 hsize, (char*)rightpage + ((PageHeader) rightpage)->pd_upper,
851 ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
853 PageSetLSN(leftpage, recptr);
854 PageSetSUI(leftpage, ThisStartUpID);
855 PageSetLSN(rightpage, recptr);
856 PageSetSUI(rightpage, ThisStartUpID);
857 if (!P_RIGHTMOST(ropaque))
859 PageSetLSN(spage, recptr);
860 PageSetSUI(spage, ThisStartUpID);
866 * By here, the original data page has been split into two new halves,
867 * and these are correct. The algorithm requires that the left page
868 * never move during a split, so we copy the new left page back on top
869 * of the original. Note that this is not a waste of time, since we
870 * also require (in the page management code) that the center of a
871 * page always be clean, and the most efficient way to guarantee this
872 * is just to compact the data by reinserting it into a new left page.
875 PageRestoreTempPage(leftpage, origpage);
877 if (!P_RIGHTMOST(ropaque))
879 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
880 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
882 /* write and release the old right sibling */
883 _bt_wrtbuf(rel, sbuf);
891 * _bt_findsplitloc() -- find an appropriate place to split a page.
893 * The idea here is to equalize the free space that will be on each split
894 * page, *after accounting for the inserted tuple*. (If we fail to account
895 * for it, we might find ourselves with too little room on the page that
896 * it needs to go into!)
898 * We are passed the intended insert position of the new tuple, expressed as
899 * the offsetnumber of the tuple it must go in front of. (This could be
900 * maxoff+1 if the tuple is to go at the end.)
902 * We return the index of the first existing tuple that should go on the
903 * righthand page, plus a boolean indicating whether the new tuple goes on
904 * the left or right page. The bool is necessary to disambiguate the case
905 * where firstright == newitemoff.
908 _bt_findsplitloc(Relation rel,
910 OffsetNumber newitemoff,
925 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
927 /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
928 newitemsz += sizeof(ItemIdData);
929 state.newitemsz = newitemsz;
930 state.non_leaf = ! P_ISLEAF(opaque);
931 state.have_split = false;
933 /* Total free space available on a btree page, after fixed overhead */
934 leftspace = rightspace =
935 PageGetPageSize(page) - sizeof(PageHeaderData) -
936 MAXALIGN(sizeof(BTPageOpaqueData))
937 + sizeof(ItemIdData);
940 * Finding the best possible split would require checking all the possible
941 * split points, because of the high-key and left-key special cases.
942 * That's probably more work than it's worth; instead, stop as soon as
943 * we find a "good-enough" split, where good-enough is defined as an
944 * imbalance in free space of no more than pagesize/16 (arbitrary...)
945 * This should let us stop near the middle on most pages, instead of
946 * plowing to the end.
948 goodenough = leftspace / 16;
950 /* The right page will have the same high key as the old page */
951 if (!P_RIGHTMOST(opaque))
953 itemid = PageGetItemId(page, P_HIKEY);
954 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
958 /* Count up total space in data items without actually scanning 'em */
959 dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
962 * Scan through the data items and calculate space usage for a split
963 * at each possible position.
966 maxoff = PageGetMaxOffsetNumber(page);
968 for (offnum = P_FIRSTDATAKEY(opaque);
970 offnum = OffsetNumberNext(offnum))
976 itemid = PageGetItemId(page, offnum);
977 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
980 * We have to allow for the current item becoming the high key of
981 * the left page; therefore it counts against left space as well
984 leftfree = leftspace - dataitemstoleft - (int) itemsz;
985 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
987 * Will the new item go to left or right of split?
989 if (offnum > newitemoff)
990 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
992 else if (offnum < newitemoff)
993 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
997 /* need to try it both ways! */
998 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1000 /* here we are contemplating newitem as first on right */
1001 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1005 /* Abort scan once we find a good-enough choice */
1006 if (state.have_split && state.best_delta <= goodenough)
1009 dataitemstoleft += itemsz;
1013 * I believe it is not possible to fail to find a feasible split,
1014 * but just in case ...
1016 if (! state.have_split)
1017 elog(FATAL, "_bt_findsplitloc: can't find a feasible split point for %s",
1018 RelationGetRelationName(rel));
1020 *newitemonleft = state.newitemonleft;
1021 return state.firstright;
1025 * Subroutine to analyze a particular possible split choice (ie, firstright
1026 * and newitemonleft settings), and record the best split so far in *state.
1029 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1030 int leftfree, int rightfree,
1031 bool newitemonleft, Size firstrightitemsz)
1034 * Account for the new item on whichever side it is to be put.
1037 leftfree -= (int) state->newitemsz;
1039 rightfree -= (int) state->newitemsz;
1041 * If we are not on the leaf level, we will be able to discard the
1042 * key data from the first item that winds up on the right page.
1044 if (state->non_leaf)
1045 rightfree += (int) firstrightitemsz -
1046 (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
1048 * If feasible split point, remember best delta.
1050 if (leftfree >= 0 && rightfree >= 0)
1052 int delta = leftfree - rightfree;
1056 if (!state->have_split || delta < state->best_delta)
1058 state->have_split = true;
1059 state->newitemonleft = newitemonleft;
1060 state->firstright = firstright;
1061 state->best_delta = delta;
1067 * _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1068 * we last looked at in the parent.
1070 * This is possible because we save a bit image of the last item
1071 * we looked at in the parent, and the update algorithm guarantees
1072 * that if items above us in the tree move, they only move right.
1074 * Also, re-set bts_blkno & bts_offset if changed.
1077 _bt_getstackbuf(Relation rel, BTStack stack)
1087 BTPageOpaque opaque;
1089 blkno = stack->bts_blkno;
1090 buf = _bt_getbuf(rel, blkno, BT_WRITE);
1091 page = BufferGetPage(buf);
1092 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1093 maxoff = PageGetMaxOffsetNumber(page);
1095 start = stack->bts_offset;
1097 * _bt_insertonpg set bts_offset to InvalidOffsetNumber in the
1098 * case of concurrent ROOT page split. Also, watch out for
1099 * possibility that page has a high key now when it didn't before.
1101 if (start < P_FIRSTDATAKEY(opaque))
1102 start = P_FIRSTDATAKEY(opaque);
1106 /* see if it's on this page */
1107 for (offnum = start;
1109 offnum = OffsetNumberNext(offnum))
1111 itemid = PageGetItemId(page, offnum);
1112 item = (BTItem) PageGetItem(page, itemid);
1113 if (BTItemSame(item, &stack->bts_btitem))
1115 /* Return accurate pointer to where link is now */
1116 stack->bts_blkno = blkno;
1117 stack->bts_offset = offnum;
1121 /* by here, the item we're looking for moved right at least one page */
1122 if (P_RIGHTMOST(opaque))
1123 elog(FATAL, "_bt_getstackbuf: my bits moved right off the end of the world!"
1124 "\n\tRecreate index %s.", RelationGetRelationName(rel));
1126 blkno = opaque->btpo_next;
1127 _bt_relbuf(rel, buf, BT_WRITE);
1128 buf = _bt_getbuf(rel, blkno, BT_WRITE);
1129 page = BufferGetPage(buf);
1130 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1131 maxoff = PageGetMaxOffsetNumber(page);
1132 start = P_FIRSTDATAKEY(opaque);
1137 * _bt_newroot() -- Create a new root page for the index.
1139 * We've just split the old root page and need to create a new one.
1140 * In order to do this, we add a new root page to the file, then lock
1141 * the metadata page and update it. This is guaranteed to be deadlock-
1142 * free, because all readers release their locks on the metadata page
1143 * before trying to lock the root, and all writers lock the root before
1144 * trying to lock the metadata page. We have a write lock on the old
1145 * root page, so we have not introduced any cycles into the waits-for
1148 * On entry, lbuf (the old root) and rbuf (its new peer) are write-
1149 * locked. On exit, a new root page exists with entries for the
1150 * two new children. The new root page is neither pinned nor locked, and
1151 * we have also written out lbuf and rbuf and dropped their pins/locks.
1154 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1162 BlockNumber rootblknum;
1163 BTPageOpaque rootopaque;
1173 /* get a new root page */
1174 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1175 rootpage = BufferGetPage(rootbuf);
1176 rootblknum = BufferGetBlockNumber(rootbuf);
1179 metabuf = _bt_getbuf(rel, BTREE_METAPAGE,BT_WRITE);
1182 /* NO ELOG(ERROR) from here till newroot op is logged */
1184 /* set btree special data */
1185 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1186 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1187 rootopaque->btpo_flags |= BTP_ROOT;
1188 rootopaque->btpo_parent = BTREE_METAPAGE;
1190 lbkno = BufferGetBlockNumber(lbuf);
1191 rbkno = BufferGetBlockNumber(rbuf);
1192 lpage = BufferGetPage(lbuf);
1193 rpage = BufferGetPage(rbuf);
1196 * Make sure pages in old root level have valid parent links --- we will
1197 * need this in _bt_insertonpg() if a concurrent root split happens (see
1200 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent =
1201 ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent =
1205 * Create downlink item for left page (old root). Since this will be
1206 * the first item in a non-leaf page, it implicitly has minus-infinity
1207 * key value, so we need not store any actual key in it.
1209 itemsz = sizeof(BTItemData);
1210 new_item = (BTItem) palloc(itemsz);
1211 new_item->bti_itup.t_info = itemsz;
1212 ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
1215 * Insert the left page pointer into the new root page. The root page
1216 * is the rightmost page on its level so there is no "high key" in it;
1217 * the two items will go into positions P_HIKEY and P_FIRSTKEY.
1219 if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1220 elog(STOP, "btree: failed to add leftkey to new root page");
1224 * Create downlink item for right page. The key for it is obtained from
1225 * the "high key" position in the left page.
1227 itemid = PageGetItemId(lpage, P_HIKEY);
1228 itemsz = ItemIdGetLength(itemid);
1229 item = (BTItem) PageGetItem(lpage, itemid);
1230 new_item = _bt_formitem(&(item->bti_itup));
1231 ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
1234 * insert the right page pointer into the new root page.
1236 if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1237 elog(STOP, "btree: failed to add rightkey to new root page");
1243 xl_btree_newroot xlrec;
1244 Page metapg = BufferGetPage(metabuf);
1245 BTMetaPageData *metad = BTPageGetMeta(metapg);
1248 xlrec.node = rel->rd_node;
1249 BlockIdSet(&(xlrec.rootblk), rootblknum);
1252 * Dirrect access to page is not good but faster - we should
1253 * implement some new func in page API.
1255 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT,
1256 (char*)&xlrec, SizeOfBtreeNewroot,
1257 (char*)rootpage + ((PageHeader) rootpage)->pd_upper,
1258 ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->pd_upper);
1260 metad->btm_root = rootblknum;
1261 (metad->btm_level)++;
1263 PageSetLSN(rootpage, recptr);
1264 PageSetSUI(rootpage, ThisStartUpID);
1265 PageSetLSN(metapg, recptr);
1266 PageSetSUI(metapg, ThisStartUpID);
1268 _bt_wrtbuf(rel, metabuf);
1272 /* write and let go of the new root buffer */
1273 _bt_wrtbuf(rel, rootbuf);
1276 /* update metadata page with new root block number */
1277 _bt_metaproot(rel, rootblknum, 0);
1280 /* update and release new sibling, and finally the old root */
1281 _bt_wrtbuf(rel, rbuf);
1282 _bt_wrtbuf(rel, lbuf);
1286 * _bt_pgaddtup() -- add a tuple to a particular page in the index.
1288 * This routine adds the tuple to the page as requested. It does
1289 * not affect pin/lock status, but you'd better have a write lock
1290 * and pin on the target buffer! Don't forget to write and release
1291 * the buffer afterwards, either.
1293 * The main difference between this routine and a bare PageAddItem call
1294 * is that this code knows that the leftmost data item on a non-leaf
1295 * btree page doesn't need to have a key. Therefore, it strips such
1296 * items down to just the item header. CAUTION: this works ONLY if
1297 * we insert the items in order, so that the given itup_off does
1298 * represent the final position of the item!
1301 _bt_pgaddtup(Relation rel,
1305 OffsetNumber itup_off,
1308 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1309 BTItemData truncitem;
1311 if (! P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1313 memcpy(&truncitem, btitem, sizeof(BTItemData));
1314 truncitem.bti_itup.t_info = sizeof(BTItemData);
1315 btitem = &truncitem;
1316 itemsize = sizeof(BTItemData);
1319 if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
1320 LP_USED) == InvalidOffsetNumber)
1321 elog(STOP, "btree: failed to add item to the %s for %s",
1322 where, RelationGetRelationName(rel));
1326 * _bt_isequal - used in _bt_doinsert in check for duplicates.
1328 * This is very similar to _bt_compare, except for NULL handling.
1329 * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too.
1332 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1333 int keysz, ScanKey scankey)
1339 /* Better be comparing to a leaf item */
1340 Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1342 btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
1343 itup = &(btitem->bti_itup);
1345 for (i = 1; i <= keysz; i++)
1347 ScanKey entry = &scankey[i - 1];
1353 attno = entry->sk_attno;
1355 datum = index_getattr(itup, attno, itupdesc, &isNull);
1357 /* NULLs are never equal to anything */
1358 if (entry->sk_flags & SK_ISNULL || isNull)
1361 result = DatumGetInt32(FunctionCall2(&entry->sk_func,
1369 /* if we get here, the keys are equal */