1 /*-------------------------------------------------------------------------
4 * Item insertion in Lehman and Yao btrees for Postgres.
6 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.136 2006/04/25 22:46:05 tgl Exp $
13 *-------------------------------------------------------------------------
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "miscadmin.h"
21 #include "utils/inval.h"
26 /* context data for _bt_checksplitloc */
27 Size newitemsz; /* size of new item to be inserted */
28 bool is_leaf; /* T if splitting a leaf page */
29 bool is_rightmost; /* T if splitting a rightmost page */
31 bool have_split; /* found a valid split? */
33 /* these fields valid only if have_split is true */
34 bool newitemonleft; /* new item on left or right of best split */
35 OffsetNumber firstright; /* best split point */
36 int best_delta; /* best size delta so far */
40 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
42 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
43 Relation heapRel, Buffer buf,
44 ScanKey itup_scankey);
45 static void _bt_insertonpg(Relation rel, Buffer buf,
47 int keysz, ScanKey scankey,
49 OffsetNumber afteritem,
50 bool split_only_page);
51 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
52 OffsetNumber newitemoff, Size newitemsz,
53 IndexTuple newitem, bool newitemonleft);
54 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
55 OffsetNumber newitemoff,
58 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
59 int leftfree, int rightfree,
60 bool newitemonleft, Size firstrightitemsz);
61 static void _bt_pgaddtup(Relation rel, Page page,
62 Size itemsize, IndexTuple itup,
63 OffsetNumber itup_off, const char *where);
64 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
65 int keysz, ScanKey scankey);
69 * _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
71 * This routine is called by the public interface routines, btbuild
72 * and btinsert. By here, itup is filled in, including the TID.
75 _bt_doinsert(Relation rel, IndexTuple itup,
76 bool index_is_unique, Relation heapRel)
78 int natts = rel->rd_rel->relnatts;
83 /* we need an insertion scan key to do our search, so build one */
84 itup_scankey = _bt_mkscankey(rel, itup);
87 /* find the first page containing this key */
88 stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
90 /* trade in our read lock for a write lock */
91 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
92 LockBuffer(buf, BT_WRITE);
95 * If the page was split between the time that we surrendered our read
96 * lock and acquired our write lock, then this page may no longer be the
97 * right place for the key we want to insert. In this case, we need to
98 * move right in the tree. See Lehman and Yao for an excruciatingly
99 * precise description.
101 buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
104 * If we're not allowing duplicates, make sure the key isn't already in
107 * NOTE: obviously, _bt_check_unique can only detect keys that are already
108 * in the index; so it cannot defend against concurrent insertions of the
109 * same key. We protect against that by means of holding a write lock on
110 * the target page. Any other would-be inserter of the same key must
111 * acquire a write lock on the same target page, so only one would-be
112 * inserter can be making the check at one time. Furthermore, once we are
113 * past the check we hold write locks continuously until we have performed
114 * our insertion, so no later inserter can fail to see our insertion.
115 * (This requires some care in _bt_insertonpg.)
117 * If we must wait for another xact, we release the lock while waiting,
118 * and then must start over completely.
124 xwait = _bt_check_unique(rel, itup, heapRel, buf, itup_scankey);
126 if (TransactionIdIsValid(xwait))
128 /* Have to wait for the other guy ... */
129 _bt_relbuf(rel, buf);
130 XactLockTableWait(xwait);
132 _bt_freestack(stack);
137 /* do the insertion */
138 _bt_insertonpg(rel, buf, stack, natts, itup_scankey, itup, 0, false);
141 _bt_freestack(stack);
142 _bt_freeskey(itup_scankey);
146 * _bt_check_unique() -- Check for violation of unique index constraint
148 * Returns InvalidTransactionId if there is no conflict, else an xact ID
149 * we must wait for to see if it commits a conflicting tuple. If an actual
150 * conflict is detected, no return --- just ereport().
153 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
154 Buffer buf, ScanKey itup_scankey)
156 TupleDesc itupdesc = RelationGetDescr(rel);
157 int natts = rel->rd_rel->relnatts;
162 Buffer nbuf = InvalidBuffer;
164 page = BufferGetPage(buf);
165 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
166 maxoff = PageGetMaxOffsetNumber(page);
169 * Find first item >= proposed new item. Note we could also get a pointer
170 * to end-of-page here.
172 offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
175 * Scan over all equal tuples, looking for live conflicts.
186 * make sure the offset points to an actual item before trying to
189 if (offset <= maxoff)
191 curitemid = PageGetItemId(page, offset);
194 * We can skip items that are marked killed.
196 * Formerly, we applied _bt_isequal() before checking the kill
197 * flag, so as to fall out of the item loop as soon as possible.
198 * However, in the presence of heavy update activity an index may
199 * contain many killed items with the same key; running
200 * _bt_isequal() on each killed item gets expensive. Furthermore
201 * it is likely that the non-killed version of each key appears
202 * first, so that we didn't actually get to exit any sooner
203 * anyway. So now we just advance over killed items as quickly as
204 * we can. We only apply _bt_isequal() when we get to a non-killed
205 * item or the end of the page.
207 if (!ItemIdDeleted(curitemid))
210 * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
211 * how we handling NULLs - and so we must not use _bt_compare
212 * in real comparison, but only for ordering/finding items on
213 * pages. - vadim 03/24/97
215 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
216 break; /* we're past all the equal tuples */
218 /* okay, we gotta fetch the heap tuple ... */
219 curitup = (IndexTuple) PageGetItem(page, curitemid);
220 htup.t_self = curitup->t_tid;
221 if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
224 /* it is a duplicate */
225 TransactionId xwait =
226 (TransactionIdIsValid(SnapshotDirty->xmin)) ?
227 SnapshotDirty->xmin : SnapshotDirty->xmax;
229 ReleaseBuffer(hbuffer);
232 * If this tuple is being updated by other transaction
233 * then we have to wait for its commit/abort.
235 if (TransactionIdIsValid(xwait))
237 if (nbuf != InvalidBuffer)
238 _bt_relbuf(rel, nbuf);
239 /* Tell _bt_doinsert to wait... */
244 * Otherwise we have a definite conflict.
247 (errcode(ERRCODE_UNIQUE_VIOLATION),
248 errmsg("duplicate key violates unique constraint \"%s\"",
249 RelationGetRelationName(rel))));
251 else if (htup.t_data != NULL)
254 * Hmm, if we can't see the tuple, maybe it can be marked
255 * killed. This logic should match index_getnext and
258 LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
259 if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin,
260 hbuffer) == HEAPTUPLE_DEAD)
262 curitemid->lp_flags |= LP_DELETE;
263 if (nbuf != InvalidBuffer)
264 SetBufferCommitInfoNeedsSave(nbuf);
266 SetBufferCommitInfoNeedsSave(buf);
268 LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
270 ReleaseBuffer(hbuffer);
275 * Advance to next tuple to continue checking.
278 offset = OffsetNumberNext(offset);
281 /* If scankey == hikey we gotta check the next page too */
282 if (P_RIGHTMOST(opaque))
284 if (!_bt_isequal(itupdesc, page, P_HIKEY,
285 natts, itup_scankey))
287 /* Advance to next non-dead page --- there must be one */
290 nblkno = opaque->btpo_next;
291 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
292 page = BufferGetPage(nbuf);
293 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
294 if (!P_IGNORE(opaque))
296 if (P_RIGHTMOST(opaque))
297 elog(ERROR, "fell off the end of \"%s\"",
298 RelationGetRelationName(rel));
300 maxoff = PageGetMaxOffsetNumber(page);
301 offset = P_FIRSTDATAKEY(opaque);
305 if (nbuf != InvalidBuffer)
306 _bt_relbuf(rel, nbuf);
308 return InvalidTransactionId;
312 * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
314 * This recursive procedure does the following things:
316 * + finds the right place to insert the tuple.
317 * + if necessary, splits the target page (making sure that the
318 * split is equitable as far as post-insert free space goes).
319 * + inserts the tuple.
320 * + if the page was split, pops the parent stack, and finds the
321 * right place to insert the new child pointer (by walking
322 * right using information stored in the parent stack).
323 * + invokes itself with the appropriate tuple for the right
324 * child page on the parent.
325 * + updates the metapage if a true root or fast root is split.
327 * On entry, we must have the right buffer in which to do the
328 * insertion, and the buffer must be pinned and write-locked. On return,
329 * we will have dropped both the pin and the lock on the buffer.
331 * If 'afteritem' is >0 then the new tuple must be inserted after the
332 * existing item of that number, noplace else. If 'afteritem' is 0
333 * then the procedure finds the exact spot to insert it by searching.
334 * (keysz and scankey parameters are used ONLY if afteritem == 0.
335 * The scankey must be an insertion-type scankey.)
337 * NOTE: if the new key is equal to one or more existing keys, we can
338 * legitimately place it anywhere in the series of equal keys --- in fact,
339 * if the new key is equal to the page's "high key" we can place it on
340 * the next page. If it is equal to the high key, and there's not room
341 * to insert the new tuple on the current page without splitting, then
342 * we can move right hoping to find more free space and avoid a split.
343 * (We should not move right indefinitely, however, since that leads to
344 * O(N^2) insertion behavior in the presence of many equal keys.)
345 * Once we have chosen the page to put the key on, we'll insert it before
346 * any existing equal keys because of the way _bt_binsrch() works.
348 * The locking interactions in this code are critical. You should
349 * grok Lehman and Yao's paper before making any changes. In addition,
350 * you need to understand how we disambiguate duplicate keys in this
351 * implementation, in order to be able to find our location using
352 * L&Y "move right" operations. Since we may insert duplicate user
353 * keys, and since these dups may propagate up the tree, we use the
354 * 'afteritem' parameter to position ourselves correctly for the
355 * insertion on internal pages.
359 _bt_insertonpg(Relation rel,
365 OffsetNumber afteritem,
366 bool split_only_page)
369 BTPageOpaque lpageop;
370 OffsetNumber newitemoff;
371 OffsetNumber firstright = InvalidOffsetNumber;
374 page = BufferGetPage(buf);
375 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
377 itemsz = IndexTupleDSize(*itup);
378 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
379 * need to be consistent */
382 * Check whether the item can fit on a btree page at all. (Eventually, we
383 * ought to try to apply TOAST methods if not.) We actually need to be
384 * able to fit three items on every page, so restrict any one item to 1/3
385 * the per-page available space. Note that at this point, itemsz doesn't
386 * include the ItemId.
388 if (itemsz > BTMaxItemSize(page))
390 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
391 errmsg("index row size %lu exceeds btree maximum, %lu",
392 (unsigned long) itemsz,
393 (unsigned long) BTMaxItemSize(page)),
394 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
395 "Consider a function index of an MD5 hash of the value, "
396 "or use full text indexing.")));
399 * Determine exactly where new item will go.
402 newitemoff = afteritem + 1;
406 * If we will need to split the page to put the item here,
407 * check whether we can put the tuple somewhere to the right,
408 * instead. Keep scanning right until we
409 * (a) find a page with enough free space,
410 * (b) reach the last page where the tuple can legally go, or
411 * (c) get tired of searching.
412 * (c) is not flippant; it is important because if there are many
413 * pages' worth of equal keys, it's better to split one of the early
414 * pages than to scan all the way to the end of the run of equal keys
415 * on every insert. We implement "get tired" as a random choice,
416 * since stopping after scanning a fixed number of pages wouldn't work
417 * well (we'd never reach the right-hand side of previously split
418 * pages). Currently the probability of moving right is set at 0.99,
419 * which may seem too high to change the behavior much, but it does an
420 * excellent job of preventing O(N^2) behavior with many equal keys.
423 bool movedright = false;
425 while (PageGetFreeSpace(page) < itemsz &&
426 !P_RIGHTMOST(lpageop) &&
427 _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
428 random() > (MAX_RANDOM_VALUE / 100))
431 * step right to next non-dead page
433 * must write-lock that page before releasing write lock on
434 * current page; else someone else's _bt_check_unique scan could
435 * fail to see our insertion. write locks on intermediate dead
436 * pages won't do because we don't know when they will get
437 * de-linked from the tree.
439 Buffer rbuf = InvalidBuffer;
443 BlockNumber rblkno = lpageop->btpo_next;
445 rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
446 page = BufferGetPage(rbuf);
447 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
448 if (!P_IGNORE(lpageop))
450 if (P_RIGHTMOST(lpageop))
451 elog(ERROR, "fell off the end of \"%s\"",
452 RelationGetRelationName(rel));
454 _bt_relbuf(rel, buf);
460 * Now we are on the right page, so find the insert position. If we
461 * moved right at all, we know we should insert at the start of the
462 * page, else must find the position by searching.
465 newitemoff = P_FIRSTDATAKEY(lpageop);
467 newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
471 * Do we need to split the page to fit the item on it?
473 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
474 * so this comparison is correct even though we appear to be accounting
475 * only for the item and not for its line pointer.
477 if (PageGetFreeSpace(page) < itemsz)
479 bool is_root = P_ISROOT(lpageop);
480 bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
484 /* Choose the split point */
485 firstright = _bt_findsplitloc(rel, page,
489 /* split the buffer into left and right halves */
490 rbuf = _bt_split(rel, buf, firstright,
491 newitemoff, itemsz, itup, newitemonleft);
496 * + our target page has been split;
497 * + the original tuple has been inserted;
498 * + we have write locks on both the old (left half)
499 * and new (right half) buffers, after the split; and
500 * + we know the key we want to insert into the parent
501 * (it's the "high key" on the left child page).
503 * We're ready to do the parent insertion. We need to hold onto the
504 * locks for the child pages until we locate the parent, but we can
505 * release them before doing the actual insertion (see Lehman and Yao
506 * for the reasoning).
509 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
513 Buffer metabuf = InvalidBuffer;
515 BTMetaPageData *metad = NULL;
516 OffsetNumber itup_off;
517 BlockNumber itup_blkno;
519 itup_off = newitemoff;
520 itup_blkno = BufferGetBlockNumber(buf);
523 * If we are doing this insert because we split a page that was the
524 * only one on its tree level, but was not the root, it may have been
525 * the "fast root". We need to ensure that the fast root link points
526 * at or above the current page. We can safely acquire a lock on the
527 * metapage here --- see comments for _bt_newroot().
531 Assert(!P_ISLEAF(lpageop));
533 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
534 metapg = BufferGetPage(metabuf);
535 metad = BTPageGetMeta(metapg);
537 if (metad->btm_fastlevel >= lpageop->btpo.level)
539 /* no update wanted */
540 _bt_relbuf(rel, metabuf);
541 metabuf = InvalidBuffer;
545 /* Do the update. No ereport(ERROR) until changes are logged */
546 START_CRIT_SECTION();
548 _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
550 MarkBufferDirty(buf);
552 if (BufferIsValid(metabuf))
554 metad->btm_fastroot = itup_blkno;
555 metad->btm_fastlevel = lpageop->btpo.level;
556 MarkBufferDirty(metabuf);
562 xl_btree_insert xlrec;
563 BlockNumber xldownlink;
564 xl_btree_metadata xlmeta;
567 XLogRecData rdata[4];
568 XLogRecData *nextrdata;
569 IndexTupleData trunctuple;
571 xlrec.target.node = rel->rd_node;
572 ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
574 rdata[0].data = (char *) &xlrec;
575 rdata[0].len = SizeOfBtreeInsert;
576 rdata[0].buffer = InvalidBuffer;
577 rdata[0].next = nextrdata = &(rdata[1]);
579 if (P_ISLEAF(lpageop))
580 xlinfo = XLOG_BTREE_INSERT_LEAF;
583 xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
584 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
586 nextrdata->data = (char *) &xldownlink;
587 nextrdata->len = sizeof(BlockNumber);
588 nextrdata->buffer = InvalidBuffer;
589 nextrdata->next = nextrdata + 1;
592 xlinfo = XLOG_BTREE_INSERT_UPPER;
595 if (BufferIsValid(metabuf))
597 xlmeta.root = metad->btm_root;
598 xlmeta.level = metad->btm_level;
599 xlmeta.fastroot = metad->btm_fastroot;
600 xlmeta.fastlevel = metad->btm_fastlevel;
602 nextrdata->data = (char *) &xlmeta;
603 nextrdata->len = sizeof(xl_btree_metadata);
604 nextrdata->buffer = InvalidBuffer;
605 nextrdata->next = nextrdata + 1;
608 xlinfo = XLOG_BTREE_INSERT_META;
611 /* Read comments in _bt_pgaddtup */
612 if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
615 trunctuple.t_info = sizeof(IndexTupleData);
616 nextrdata->data = (char *) &trunctuple;
617 nextrdata->len = sizeof(IndexTupleData);
621 nextrdata->data = (char *) itup;
622 nextrdata->len = IndexTupleDSize(*itup);
624 nextrdata->buffer = buf;
625 nextrdata->buffer_std = true;
626 nextrdata->next = NULL;
628 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
630 if (BufferIsValid(metabuf))
632 PageSetLSN(metapg, recptr);
633 PageSetTLI(metapg, ThisTimeLineID);
636 PageSetLSN(page, recptr);
637 PageSetTLI(page, ThisTimeLineID);
642 /* release buffers; send out relcache inval if metapage changed */
643 if (BufferIsValid(metabuf))
645 CacheInvalidateRelcache(rel);
646 _bt_relbuf(rel, metabuf);
649 _bt_relbuf(rel, buf);
654 * _bt_split() -- split a page in the btree.
656 * On entry, buf is the page to split, and is pinned and write-locked.
657 * firstright is the item index of the first item to be moved to the
658 * new right page. newitemoff etc. tell us about the new item that
659 * must be inserted along with the data from the old page.
661 * Returns the new right sibling of buf, pinned and write-locked.
662 * The pin and lock on buf are maintained.
665 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
666 OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
673 BTPageOpaque ropaque,
676 Buffer sbuf = InvalidBuffer;
678 BTPageOpaque sopaque = NULL;
679 OffsetNumber itup_off = 0;
680 BlockNumber itup_blkno = 0;
684 OffsetNumber leftoff,
689 rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
690 origpage = BufferGetPage(buf);
691 leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
692 rightpage = BufferGetPage(rbuf);
694 _bt_pageinit(leftpage, BufferGetPageSize(buf));
695 /* rightpage was already initialized by _bt_getbuf */
697 /* init btree private data */
698 oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
699 lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
700 ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
702 /* if we're splitting this page, it won't be the root when we're done */
703 lopaque->btpo_flags = oopaque->btpo_flags;
704 lopaque->btpo_flags &= ~BTP_ROOT;
705 ropaque->btpo_flags = lopaque->btpo_flags;
706 lopaque->btpo_prev = oopaque->btpo_prev;
707 lopaque->btpo_next = BufferGetBlockNumber(rbuf);
708 ropaque->btpo_prev = BufferGetBlockNumber(buf);
709 ropaque->btpo_next = oopaque->btpo_next;
710 lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
713 * If the page we're splitting is not the rightmost page at its level in
714 * the tree, then the first entry on the page is the high key for the
715 * page. We need to copy that to the right half. Otherwise (meaning the
716 * rightmost page case), all the items on the right half will be user
721 if (!P_RIGHTMOST(oopaque))
723 itemid = PageGetItemId(origpage, P_HIKEY);
724 itemsz = ItemIdGetLength(itemid);
725 item = (IndexTuple) PageGetItem(origpage, itemid);
726 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
727 LP_USED) == InvalidOffsetNumber)
728 elog(PANIC, "failed to add hikey to the right sibling");
729 rightoff = OffsetNumberNext(rightoff);
733 * The "high key" for the new left page will be the first key that's going
734 * to go into the new right page. This might be either the existing data
735 * item at position firstright, or the incoming tuple.
738 if (!newitemonleft && newitemoff == firstright)
740 /* incoming tuple will become first on right page */
746 /* existing item at firstright will become first on right page */
747 itemid = PageGetItemId(origpage, firstright);
748 itemsz = ItemIdGetLength(itemid);
749 item = (IndexTuple) PageGetItem(origpage, itemid);
751 if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
752 LP_USED) == InvalidOffsetNumber)
753 elog(PANIC, "failed to add hikey to the left sibling");
754 leftoff = OffsetNumberNext(leftoff);
757 * Now transfer all the data items to the appropriate page
759 maxoff = PageGetMaxOffsetNumber(origpage);
761 for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
763 itemid = PageGetItemId(origpage, i);
764 itemsz = ItemIdGetLength(itemid);
765 item = (IndexTuple) PageGetItem(origpage, itemid);
767 /* does new item belong before this one? */
772 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
775 itup_blkno = BufferGetBlockNumber(buf);
776 leftoff = OffsetNumberNext(leftoff);
780 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
783 itup_blkno = BufferGetBlockNumber(rbuf);
784 rightoff = OffsetNumberNext(rightoff);
788 /* decide which page to put it on */
791 _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
793 leftoff = OffsetNumberNext(leftoff);
797 _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
799 rightoff = OffsetNumberNext(rightoff);
803 /* cope with possibility that newitem goes at the end */
808 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
811 itup_blkno = BufferGetBlockNumber(buf);
812 leftoff = OffsetNumberNext(leftoff);
816 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
819 itup_blkno = BufferGetBlockNumber(rbuf);
820 rightoff = OffsetNumberNext(rightoff);
825 * We have to grab the right sibling (if any) and fix the prev pointer
826 * there. We are guaranteed that this is deadlock-free since no other
827 * writer will be holding a lock on that page and trying to move left, and
828 * all readers release locks on a page before trying to fetch its
832 if (!P_RIGHTMOST(ropaque))
834 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
835 spage = BufferGetPage(sbuf);
836 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
837 if (sopaque->btpo_prev != ropaque->btpo_prev)
838 elog(PANIC, "right sibling's left-link doesn't match");
842 * Right sibling is locked, new siblings are prepared, but original page
843 * is not updated yet. Log changes before continuing.
845 * NO EREPORT(ERROR) till right sibling is updated. We can get away with
846 * not starting the critical section till here because we haven't been
847 * scribbling on the original page yet, and we don't care about the
848 * new sibling until it's linked into the btree.
850 START_CRIT_SECTION();
852 MarkBufferDirty(buf);
853 MarkBufferDirty(rbuf);
855 if (!P_RIGHTMOST(ropaque))
857 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
858 MarkBufferDirty(sbuf);
864 xl_btree_split xlrec;
867 XLogRecData rdata[4];
869 xlrec.target.node = rel->rd_node;
870 ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
872 xlrec.otherblk = BufferGetBlockNumber(rbuf);
874 xlrec.otherblk = BufferGetBlockNumber(buf);
875 xlrec.leftblk = lopaque->btpo_prev;
876 xlrec.rightblk = ropaque->btpo_next;
877 xlrec.level = lopaque->btpo.level;
880 * Direct access to page is not good but faster - we should implement
881 * some new func in page API. Note we only store the tuples
882 * themselves, knowing that the item pointers are in the same order
883 * and can be reconstructed by scanning the tuples. See comments
884 * for _bt_restore_page().
886 xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
887 ((PageHeader) leftpage)->pd_upper;
889 rdata[0].data = (char *) &xlrec;
890 rdata[0].len = SizeOfBtreeSplit;
891 rdata[0].buffer = InvalidBuffer;
892 rdata[0].next = &(rdata[1]);
894 rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
895 rdata[1].len = xlrec.leftlen;
896 rdata[1].buffer = InvalidBuffer;
897 rdata[1].next = &(rdata[2]);
899 rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
900 rdata[2].len = ((PageHeader) rightpage)->pd_special -
901 ((PageHeader) rightpage)->pd_upper;
902 rdata[2].buffer = InvalidBuffer;
903 rdata[2].next = NULL;
905 if (!P_RIGHTMOST(ropaque))
907 rdata[2].next = &(rdata[3]);
908 rdata[3].data = NULL;
910 rdata[3].buffer = sbuf;
911 rdata[3].buffer_std = true;
912 rdata[3].next = NULL;
915 if (P_ISROOT(oopaque))
916 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
918 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
920 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
922 PageSetLSN(leftpage, recptr);
923 PageSetTLI(leftpage, ThisTimeLineID);
924 PageSetLSN(rightpage, recptr);
925 PageSetTLI(rightpage, ThisTimeLineID);
926 if (!P_RIGHTMOST(ropaque))
928 PageSetLSN(spage, recptr);
929 PageSetTLI(spage, ThisTimeLineID);
934 * By here, the original data page has been split into two new halves, and
935 * these are correct. The algorithm requires that the left page never
936 * move during a split, so we copy the new left page back on top of the
937 * original. Note that this is not a waste of time, since we also require
938 * (in the page management code) that the center of a page always be
939 * clean, and the most efficient way to guarantee this is just to compact
940 * the data by reinserting it into a new left page. (XXX the latter
941 * comment is probably obsolete.)
943 * It's a bit weird that we don't fill in the left page till after writing
944 * the XLOG entry, but not really worth changing. Note that we use the
945 * origpage data (specifically its BTP_ROOT bit) while preparing the XLOG
946 * entry, so simply reshuffling the code won't do.
949 PageRestoreTempPage(leftpage, origpage);
953 /* release the old right sibling */
954 if (!P_RIGHTMOST(ropaque))
955 _bt_relbuf(rel, sbuf);
962 * _bt_findsplitloc() -- find an appropriate place to split a page.
964 * The idea here is to equalize the free space that will be on each split
965 * page, *after accounting for the inserted tuple*. (If we fail to account
966 * for it, we might find ourselves with too little room on the page that
967 * it needs to go into!)
969 * If the page is the rightmost page on its level, we instead try to arrange
970 * for twice as much free space on the right as on the left. In this way,
971 * when we are inserting successively increasing keys (consider sequences,
972 * timestamps, etc) we will end up with a tree whose pages are about 67% full,
973 * instead of the 50% full result that we'd get without this special case.
974 * (We could bias it even further to make the initially-loaded tree more full.
975 * But since the steady-state load for a btree is about 70%, we'd likely just
976 * be making more page-splitting work for ourselves later on, when we start
977 * seeing updates to existing tuples.)
979 * We are passed the intended insert position of the new tuple, expressed as
980 * the offsetnumber of the tuple it must go in front of. (This could be
981 * maxoff+1 if the tuple is to go at the end.)
983 * We return the index of the first existing tuple that should go on the
984 * righthand page, plus a boolean indicating whether the new tuple goes on
985 * the left or right page. The bool is necessary to disambiguate the case
986 * where firstright == newitemoff.
989 _bt_findsplitloc(Relation rel,
991 OffsetNumber newitemoff,
1006 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1008 /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1009 newitemsz += sizeof(ItemIdData);
1010 state.newitemsz = newitemsz;
1011 state.is_leaf = P_ISLEAF(opaque);
1012 state.is_rightmost = P_RIGHTMOST(opaque);
1013 state.have_split = false;
1015 /* Total free space available on a btree page, after fixed overhead */
1016 leftspace = rightspace =
1017 PageGetPageSize(page) - SizeOfPageHeaderData -
1018 MAXALIGN(sizeof(BTPageOpaqueData));
1021 * Finding the best possible split would require checking all the possible
1022 * split points, because of the high-key and left-key special cases.
1023 * That's probably more work than it's worth; instead, stop as soon as we
1024 * find a "good-enough" split, where good-enough is defined as an
1025 * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1026 * should let us stop near the middle on most pages, instead of plowing to
1029 goodenough = leftspace / 16;
1031 /* The right page will have the same high key as the old page */
1032 if (!P_RIGHTMOST(opaque))
1034 itemid = PageGetItemId(page, P_HIKEY);
1035 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1036 sizeof(ItemIdData));
1039 /* Count up total space in data items without actually scanning 'em */
1040 dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
1043 * Scan through the data items and calculate space usage for a split at
1044 * each possible position.
1046 dataitemstoleft = 0;
1047 maxoff = PageGetMaxOffsetNumber(page);
1049 for (offnum = P_FIRSTDATAKEY(opaque);
1051 offnum = OffsetNumberNext(offnum))
1057 itemid = PageGetItemId(page, offnum);
1058 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1061 * We have to allow for the current item becoming the high key of the
1062 * left page; therefore it counts against left space as well as right
1065 leftfree = leftspace - dataitemstoleft - (int) itemsz;
1066 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
1069 * Will the new item go to left or right of split?
1071 if (offnum > newitemoff)
1072 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1074 else if (offnum < newitemoff)
1075 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1079 /* need to try it both ways! */
1080 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1082 /* here we are contemplating newitem as first on right */
1083 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1087 /* Abort scan once we find a good-enough choice */
1088 if (state.have_split && state.best_delta <= goodenough)
1091 dataitemstoleft += itemsz;
1095 * I believe it is not possible to fail to find a feasible split, but just
1098 if (!state.have_split)
1099 elog(ERROR, "could not find a feasible split point for \"%s\"",
1100 RelationGetRelationName(rel));
1102 *newitemonleft = state.newitemonleft;
1103 return state.firstright;
1107 * Subroutine to analyze a particular possible split choice (ie, firstright
1108 * and newitemonleft settings), and record the best split so far in *state.
1111 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1112 int leftfree, int rightfree,
1113 bool newitemonleft, Size firstrightitemsz)
1116 * Account for the new item on whichever side it is to be put.
1119 leftfree -= (int) state->newitemsz;
1121 rightfree -= (int) state->newitemsz;
1124 * If we are not on the leaf level, we will be able to discard the key
1125 * data from the first item that winds up on the right page.
1127 if (!state->is_leaf)
1128 rightfree += (int) firstrightitemsz -
1129 (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1132 * If feasible split point, remember best delta.
1134 if (leftfree >= 0 && rightfree >= 0)
1138 if (state->is_rightmost)
1141 * On a rightmost page, try to equalize right free space with
1142 * twice the left free space. See comments for _bt_findsplitloc.
1144 delta = (2 * leftfree) - rightfree;
1148 /* Otherwise, aim for equal free space on both sides */
1149 delta = leftfree - rightfree;
1154 if (!state->have_split || delta < state->best_delta)
1156 state->have_split = true;
1157 state->newitemonleft = newitemonleft;
1158 state->firstright = firstright;
1159 state->best_delta = delta;
1165 * _bt_insert_parent() -- Insert downlink into parent after a page split.
1167 * On entry, buf and rbuf are the left and right split pages, which we
1168 * still hold write locks on per the L&Y algorithm. We release the
1169 * write locks once we have write lock on the parent page. (Any sooner,
1170 * and it'd be possible for some other process to try to split or delete
1171 * one of these pages, and get confused because it cannot find the downlink.)
1173 * stack - stack showing how we got here. May be NULL in cases that don't
1174 * have to be efficient (concurrent ROOT split, WAL recovery)
1175 * is_root - we split the true root
1176 * is_only - we split a page alone on its level (might have been fast root)
1178 * This is exported so it can be called by nbtxlog.c.
1181 _bt_insert_parent(Relation rel,
1189 * Here we have to do something Lehman and Yao don't talk about: deal with
1190 * a root split and construction of a new root. If our stack is empty
1191 * then we have just split a node on what had been the root level when we
1192 * descended the tree. If it was still the root then we perform a
1193 * new-root construction. If it *wasn't* the root anymore, search to find
1194 * the next higher level that someone constructed meanwhile, and find the
1195 * right place to insert as for the normal case.
1197 * If we have to search for the parent level, we do so by re-descending
1198 * from the root. This is not super-efficient, but it's rare enough not
1199 * to matter. (This path is also taken when called from WAL recovery ---
1200 * we have no stack in that case.)
1206 Assert(stack == NULL);
1208 /* create a new root node and update the metapage */
1209 rootbuf = _bt_newroot(rel, buf, rbuf);
1210 /* release the split buffers */
1211 _bt_relbuf(rel, rootbuf);
1212 _bt_relbuf(rel, rbuf);
1213 _bt_relbuf(rel, buf);
1217 BlockNumber bknum = BufferGetBlockNumber(buf);
1218 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1219 Page page = BufferGetPage(buf);
1220 IndexTuple new_item;
1221 BTStackData fakestack;
1227 BTPageOpaque lpageop;
1230 elog(DEBUG2, "concurrent ROOT page split");
1231 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1232 /* Find the leftmost page at the next level up */
1233 pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1234 /* Set up a phony stack entry pointing there */
1236 stack->bts_blkno = BufferGetBlockNumber(pbuf);
1237 stack->bts_offset = InvalidOffsetNumber;
1238 /* bts_btentry will be initialized below */
1239 stack->bts_parent = NULL;
1240 _bt_relbuf(rel, pbuf);
1243 /* get high key from left page == lowest key on new right page */
1244 ritem = (IndexTuple) PageGetItem(page,
1245 PageGetItemId(page, P_HIKEY));
1247 /* form an index tuple that points at the new right page */
1248 new_item = CopyIndexTuple(ritem);
1249 ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1252 * Find the parent buffer and get the parent page.
1254 * Oops - if we were moved right then we need to change stack item! We
1255 * want to find parent pointing to where we are, right ? - vadim
1258 ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1260 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1262 /* Now we can unlock the children */
1263 _bt_relbuf(rel, rbuf);
1264 _bt_relbuf(rel, buf);
1266 /* Check for error only after writing children */
1267 if (pbuf == InvalidBuffer)
1268 elog(ERROR, "failed to re-find parent key in \"%s\"",
1269 RelationGetRelationName(rel));
1271 /* Recursively update the parent */
1272 _bt_insertonpg(rel, pbuf, stack->bts_parent,
1273 0, NULL, new_item, stack->bts_offset,
1282 * _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1283 * we last looked at in the parent.
1285 * This is possible because we save the downlink from the parent item,
1286 * which is enough to uniquely identify it. Insertions into the parent
1287 * level could cause the item to move right; deletions could cause it
1288 * to move left, but not left of the page we previously found it in.
1290 * Adjusts bts_blkno & bts_offset if changed.
1292 * Returns InvalidBuffer if item not found (should not happen).
1295 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1300 blkno = stack->bts_blkno;
1301 start = stack->bts_offset;
1307 BTPageOpaque opaque;
1309 buf = _bt_getbuf(rel, blkno, access);
1310 page = BufferGetPage(buf);
1311 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1313 if (!P_IGNORE(opaque))
1315 OffsetNumber offnum,
1321 minoff = P_FIRSTDATAKEY(opaque);
1322 maxoff = PageGetMaxOffsetNumber(page);
1325 * start = InvalidOffsetNumber means "search the whole page". We
1326 * need this test anyway due to possibility that page has a high
1327 * key now when it didn't before.
1333 * Need this check too, to guard against possibility that page
1334 * split since we visited it originally.
1337 start = OffsetNumberNext(maxoff);
1340 * These loops will check every item on the page --- but in an
1341 * order that's attuned to the probability of where it actually
1342 * is. Scan to the right first, then to the left.
1344 for (offnum = start;
1346 offnum = OffsetNumberNext(offnum))
1348 itemid = PageGetItemId(page, offnum);
1349 item = (IndexTuple) PageGetItem(page, itemid);
1350 if (BTEntrySame(item, &stack->bts_btentry))
1352 /* Return accurate pointer to where link is now */
1353 stack->bts_blkno = blkno;
1354 stack->bts_offset = offnum;
1359 for (offnum = OffsetNumberPrev(start);
1361 offnum = OffsetNumberPrev(offnum))
1363 itemid = PageGetItemId(page, offnum);
1364 item = (IndexTuple) PageGetItem(page, itemid);
1365 if (BTEntrySame(item, &stack->bts_btentry))
1367 /* Return accurate pointer to where link is now */
1368 stack->bts_blkno = blkno;
1369 stack->bts_offset = offnum;
1376 * The item we're looking for moved right at least one page.
1378 if (P_RIGHTMOST(opaque))
1380 _bt_relbuf(rel, buf);
1381 return InvalidBuffer;
1383 blkno = opaque->btpo_next;
1384 start = InvalidOffsetNumber;
1385 _bt_relbuf(rel, buf);
1390 * _bt_newroot() -- Create a new root page for the index.
1392 * We've just split the old root page and need to create a new one.
1393 * In order to do this, we add a new root page to the file, then lock
1394 * the metadata page and update it. This is guaranteed to be deadlock-
1395 * free, because all readers release their locks on the metadata page
1396 * before trying to lock the root, and all writers lock the root before
1397 * trying to lock the metadata page. We have a write lock on the old
1398 * root page, so we have not introduced any cycles into the waits-for
1401 * On entry, lbuf (the old root) and rbuf (its new peer) are write-
1402 * locked. On exit, a new root page exists with entries for the
1403 * two new children, metapage is updated and unlocked/unpinned.
1404 * The new root buffer is returned to caller which has to unlock/unpin
1405 * lbuf, rbuf & rootbuf.
1408 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1415 BlockNumber rootblknum;
1416 BTPageOpaque rootopaque;
1420 IndexTuple new_item;
1423 BTMetaPageData *metad;
1425 lbkno = BufferGetBlockNumber(lbuf);
1426 rbkno = BufferGetBlockNumber(rbuf);
1427 lpage = BufferGetPage(lbuf);
1429 /* get a new root page */
1430 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1431 rootpage = BufferGetPage(rootbuf);
1432 rootblknum = BufferGetBlockNumber(rootbuf);
1434 /* acquire lock on the metapage */
1435 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1436 metapg = BufferGetPage(metabuf);
1437 metad = BTPageGetMeta(metapg);
1439 /* NO EREPORT(ERROR) from here till newroot op is logged */
1440 START_CRIT_SECTION();
1442 /* set btree special data */
1443 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1444 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1445 rootopaque->btpo_flags = BTP_ROOT;
1446 rootopaque->btpo.level =
1447 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1449 /* update metapage data */
1450 metad->btm_root = rootblknum;
1451 metad->btm_level = rootopaque->btpo.level;
1452 metad->btm_fastroot = rootblknum;
1453 metad->btm_fastlevel = rootopaque->btpo.level;
1456 * Create downlink item for left page (old root). Since this will be the
1457 * first item in a non-leaf page, it implicitly has minus-infinity key
1458 * value, so we need not store any actual key in it.
1460 itemsz = sizeof(IndexTupleData);
1461 new_item = (IndexTuple) palloc(itemsz);
1462 new_item->t_info = itemsz;
1463 ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
1466 * Insert the left page pointer into the new root page. The root page is
1467 * the rightmost page on its level so there is no "high key" in it; the
1468 * two items will go into positions P_HIKEY and P_FIRSTKEY.
1470 * Note: we *must* insert the two items in item-number order, for the
1471 * benefit of _bt_restore_page().
1473 if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1474 elog(PANIC, "failed to add leftkey to new root page");
1478 * Create downlink item for right page. The key for it is obtained from
1479 * the "high key" position in the left page.
1481 itemid = PageGetItemId(lpage, P_HIKEY);
1482 itemsz = ItemIdGetLength(itemid);
1483 item = (IndexTuple) PageGetItem(lpage, itemid);
1484 new_item = CopyIndexTuple(item);
1485 ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
1488 * insert the right page pointer into the new root page.
1490 if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1491 elog(PANIC, "failed to add rightkey to new root page");
1494 MarkBufferDirty(rootbuf);
1495 MarkBufferDirty(metabuf);
1498 if (!rel->rd_istemp)
1500 xl_btree_newroot xlrec;
1502 XLogRecData rdata[2];
1504 xlrec.node = rel->rd_node;
1505 xlrec.rootblk = rootblknum;
1506 xlrec.level = metad->btm_level;
1508 rdata[0].data = (char *) &xlrec;
1509 rdata[0].len = SizeOfBtreeNewroot;
1510 rdata[0].buffer = InvalidBuffer;
1511 rdata[0].next = &(rdata[1]);
1514 * Direct access to page is not good but faster - we should implement
1515 * some new func in page API.
1517 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1518 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1519 ((PageHeader) rootpage)->pd_upper;
1520 rdata[1].buffer = InvalidBuffer;
1521 rdata[1].next = NULL;
1523 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1525 PageSetLSN(rootpage, recptr);
1526 PageSetTLI(rootpage, ThisTimeLineID);
1527 PageSetLSN(metapg, recptr);
1528 PageSetTLI(metapg, ThisTimeLineID);
1533 /* send out relcache inval for metapage change */
1534 CacheInvalidateRelcache(rel);
1536 /* done with metapage */
1537 _bt_relbuf(rel, metabuf);
1543 * _bt_pgaddtup() -- add a tuple to a particular page in the index.
1545 * This routine adds the tuple to the page as requested. It does
1546 * not affect pin/lock status, but you'd better have a write lock
1547 * and pin on the target buffer! Don't forget to write and release
1548 * the buffer afterwards, either.
1550 * The main difference between this routine and a bare PageAddItem call
1551 * is that this code knows that the leftmost index tuple on a non-leaf
1552 * btree page doesn't need to have a key. Therefore, it strips such
1553 * tuples down to just the tuple header. CAUTION: this works ONLY if
1554 * we insert the tuples in order, so that the given itup_off does
1555 * represent the final position of the tuple!
1558 _bt_pgaddtup(Relation rel,
1562 OffsetNumber itup_off,
1565 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1566 IndexTupleData trunctuple;
1568 if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1571 trunctuple.t_info = sizeof(IndexTupleData);
1573 itemsize = sizeof(IndexTupleData);
1576 if (PageAddItem(page, (Item) itup, itemsize, itup_off,
1577 LP_USED) == InvalidOffsetNumber)
1578 elog(PANIC, "failed to add item to the %s for \"%s\"",
1579 where, RelationGetRelationName(rel));
1583 * _bt_isequal - used in _bt_doinsert in check for duplicates.
1585 * This is very similar to _bt_compare, except for NULL handling.
1586 * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1589 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1590 int keysz, ScanKey scankey)
1595 /* Better be comparing to a leaf item */
1596 Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1598 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
1600 for (i = 1; i <= keysz; i++)
1607 attno = scankey->sk_attno;
1609 datum = index_getattr(itup, attno, itupdesc, &isNull);
1611 /* NULLs are never equal to anything */
1612 if (isNull || (scankey->sk_flags & SK_ISNULL))
1615 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1617 scankey->sk_argument));
1625 /* if we get here, the keys are equal */