1 /*-------------------------------------------------------------------------
4 * Item insertion in Lehman and Yao btrees for Postgres.
6 * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.117 2004/10/15 22:39:49 tgl Exp $
13 *-------------------------------------------------------------------------
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "miscadmin.h"
25 /* context data for _bt_checksplitloc */
26 Size newitemsz; /* size of new item to be inserted */
27 bool is_leaf; /* T if splitting a leaf page */
28 bool is_rightmost; /* T if splitting a rightmost page */
30 bool have_split; /* found a valid split? */
32 /* these fields valid only if have_split is true */
33 bool newitemonleft; /* new item on left or right of best split */
34 OffsetNumber firstright; /* best split point */
35 int best_delta; /* best size delta so far */
39 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
41 static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
42 Relation heapRel, Buffer buf,
43 ScanKey itup_scankey);
44 static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
46 int keysz, ScanKey scankey,
48 OffsetNumber afteritem,
49 bool split_only_page);
50 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
51 OffsetNumber newitemoff, Size newitemsz,
52 BTItem newitem, bool newitemonleft,
53 OffsetNumber *itup_off, BlockNumber *itup_blkno);
54 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
55 OffsetNumber newitemoff,
58 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
59 int leftfree, int rightfree,
60 bool newitemonleft, Size firstrightitemsz);
61 static void _bt_pgaddtup(Relation rel, Page page,
62 Size itemsize, BTItem btitem,
63 OffsetNumber itup_off, const char *where);
64 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
65 int keysz, ScanKey scankey);
69 * _bt_doinsert() -- Handle insertion of a single btitem in the tree.
71 * This routine is called by the public interface routines, btbuild
72 * and btinsert. By here, btitem is filled in, including the TID.
75 _bt_doinsert(Relation rel, BTItem btitem,
76 bool index_is_unique, Relation heapRel)
78 IndexTuple itup = &(btitem->bti_itup);
79 int natts = rel->rd_rel->relnatts;
83 InsertIndexResult res;
85 /* we need a scan key to do our search, so build one */
86 itup_scankey = _bt_mkscankey(rel, itup);
89 /* find the first page containing this key */
90 stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
92 /* trade in our read lock for a write lock */
93 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
94 LockBuffer(buf, BT_WRITE);
97 * If the page was split between the time that we surrendered our read
98 * lock and acquired our write lock, then this page may no longer be
99 * the right place for the key we want to insert. In this case, we
100 * need to move right in the tree. See Lehman and Yao for an
101 * excruciatingly precise description.
103 buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
106 * If we're not allowing duplicates, make sure the key isn't already
109 * NOTE: obviously, _bt_check_unique can only detect keys that are
110 * already in the index; so it cannot defend against concurrent
111 * insertions of the same key. We protect against that by means of
112 * holding a write lock on the target page. Any other would-be
113 * inserter of the same key must acquire a write lock on the same
114 * target page, so only one would-be inserter can be making the check
115 * at one time. Furthermore, once we are past the check we hold write
116 * locks continuously until we have performed our insertion, so no
117 * later inserter can fail to see our insertion. (This requires some
118 * care in _bt_insertonpg.)
120 * If we must wait for another xact, we release the lock while waiting,
121 * and then must start over completely.
127 xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
129 if (TransactionIdIsValid(xwait))
131 /* Have to wait for the other guy ... */
132 _bt_relbuf(rel, buf);
133 XactLockTableWait(xwait);
135 _bt_freestack(stack);
140 /* do the insertion */
141 res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem,
145 _bt_freestack(stack);
146 _bt_freeskey(itup_scankey);
152 * _bt_check_unique() -- Check for violation of unique index constraint
154 * Returns InvalidTransactionId if there is no conflict, else an xact ID
155 * we must wait for to see if it commits a conflicting tuple. If an actual
156 * conflict is detected, no return --- just ereport().
159 _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
160 Buffer buf, ScanKey itup_scankey)
162 TupleDesc itupdesc = RelationGetDescr(rel);
163 int natts = rel->rd_rel->relnatts;
168 Buffer nbuf = InvalidBuffer;
170 page = BufferGetPage(buf);
171 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
172 maxoff = PageGetMaxOffsetNumber(page);
175 * Find first item >= proposed new item. Note we could also get a
176 * pointer to end-of-page here.
178 offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
181 * Scan over all equal tuples, looking for live conflicts.
192 * make sure the offset points to an actual item before trying to
195 if (offset <= maxoff)
197 curitemid = PageGetItemId(page, offset);
200 * We can skip items that are marked killed.
202 * Formerly, we applied _bt_isequal() before checking the kill
203 * flag, so as to fall out of the item loop as soon as
204 * possible. However, in the presence of heavy update activity
205 * an index may contain many killed items with the same key;
206 * running _bt_isequal() on each killed item gets expensive.
207 * Furthermore it is likely that the non-killed version of
208 * each key appears first, so that we didn't actually get to
209 * exit any sooner anyway. So now we just advance over killed
210 * items as quickly as we can. We only apply _bt_isequal()
211 * when we get to a non-killed item or the end of the page.
213 if (!ItemIdDeleted(curitemid))
216 * _bt_compare returns 0 for (1,NULL) and (1,NULL) -
217 * this's how we handling NULLs - and so we must not use
218 * _bt_compare in real comparison, but only for
219 * ordering/finding items on pages. - vadim 03/24/97
221 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
222 break; /* we're past all the equal tuples */
224 /* okay, we gotta fetch the heap tuple ... */
225 cbti = (BTItem) PageGetItem(page, curitemid);
226 htup.t_self = cbti->bti_itup.t_tid;
227 if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
230 /* it is a duplicate */
231 TransactionId xwait =
232 (TransactionIdIsValid(SnapshotDirty->xmin)) ?
233 SnapshotDirty->xmin : SnapshotDirty->xmax;
235 ReleaseBuffer(hbuffer);
238 * If this tuple is being updated by other transaction
239 * then we have to wait for its commit/abort.
241 if (TransactionIdIsValid(xwait))
243 if (nbuf != InvalidBuffer)
244 _bt_relbuf(rel, nbuf);
245 /* Tell _bt_doinsert to wait... */
250 * Otherwise we have a definite conflict.
253 (errcode(ERRCODE_UNIQUE_VIOLATION),
254 errmsg("duplicate key violates unique constraint \"%s\"",
255 RelationGetRelationName(rel))));
257 else if (htup.t_data != NULL)
260 * Hmm, if we can't see the tuple, maybe it can be
261 * marked killed. This logic should match
262 * index_getnext and btgettuple.
264 LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
265 if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin,
266 hbuffer) == HEAPTUPLE_DEAD)
268 curitemid->lp_flags |= LP_DELETE;
269 SetBufferCommitInfoNeedsSave(buf);
271 LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
272 ReleaseBuffer(hbuffer);
278 * Advance to next tuple to continue checking.
281 offset = OffsetNumberNext(offset);
284 /* If scankey == hikey we gotta check the next page too */
285 if (P_RIGHTMOST(opaque))
287 if (!_bt_isequal(itupdesc, page, P_HIKEY,
288 natts, itup_scankey))
290 /* Advance to next non-dead page --- there must be one */
293 nblkno = opaque->btpo_next;
294 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
295 page = BufferGetPage(nbuf);
296 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
297 if (!P_IGNORE(opaque))
299 if (P_RIGHTMOST(opaque))
300 elog(ERROR, "fell off the end of \"%s\"",
301 RelationGetRelationName(rel));
303 maxoff = PageGetMaxOffsetNumber(page);
304 offset = P_FIRSTDATAKEY(opaque);
308 if (nbuf != InvalidBuffer)
309 _bt_relbuf(rel, nbuf);
311 return InvalidTransactionId;
315 * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
317 * This recursive procedure does the following things:
319 * + finds the right place to insert the tuple.
320 * + if necessary, splits the target page (making sure that the
321 * split is equitable as far as post-insert free space goes).
322 * + inserts the tuple.
323 * + if the page was split, pops the parent stack, and finds the
324 * right place to insert the new child pointer (by walking
325 * right using information stored in the parent stack).
326 * + invokes itself with the appropriate tuple for the right
327 * child page on the parent.
328 * + updates the metapage if a true root or fast root is split.
330 * On entry, we must have the right buffer on which to do the
331 * insertion, and the buffer must be pinned and locked. On return,
332 * we will have dropped both the pin and the write lock on the buffer.
334 * If 'afteritem' is >0 then the new tuple must be inserted after the
335 * existing item of that number, noplace else. If 'afteritem' is 0
336 * then the procedure finds the exact spot to insert it by searching.
337 * (keysz and scankey parameters are used ONLY if afteritem == 0.)
339 * NOTE: if the new key is equal to one or more existing keys, we can
340 * legitimately place it anywhere in the series of equal keys --- in fact,
341 * if the new key is equal to the page's "high key" we can place it on
342 * the next page. If it is equal to the high key, and there's not room
343 * to insert the new tuple on the current page without splitting, then
344 * we can move right hoping to find more free space and avoid a split.
345 * (We should not move right indefinitely, however, since that leads to
346 * O(N^2) insertion behavior in the presence of many equal keys.)
347 * Once we have chosen the page to put the key on, we'll insert it before
348 * any existing equal keys because of the way _bt_binsrch() works.
350 * The locking interactions in this code are critical. You should
351 * grok Lehman and Yao's paper before making any changes. In addition,
352 * you need to understand how we disambiguate duplicate keys in this
353 * implementation, in order to be able to find our location using
354 * L&Y "move right" operations. Since we may insert duplicate user
355 * keys, and since these dups may propagate up the tree, we use the
356 * 'afteritem' parameter to position ourselves correctly for the
357 * insertion on internal pages.
360 static InsertIndexResult
361 _bt_insertonpg(Relation rel,
367 OffsetNumber afteritem,
368 bool split_only_page)
370 InsertIndexResult res;
372 BTPageOpaque lpageop;
373 OffsetNumber itup_off;
374 BlockNumber itup_blkno;
375 OffsetNumber newitemoff;
376 OffsetNumber firstright = InvalidOffsetNumber;
379 page = BufferGetPage(buf);
380 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
382 itemsz = IndexTupleDSize(btitem->bti_itup)
383 + (sizeof(BTItemData) - sizeof(IndexTupleData));
385 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but
386 * we need to be consistent */
389 * Check whether the item can fit on a btree page at all. (Eventually,
390 * we ought to try to apply TOAST methods if not.) We actually need to
391 * be able to fit three items on every page, so restrict any one item
392 * to 1/3 the per-page available space. Note that at this point,
393 * itemsz doesn't include the ItemId.
395 if (itemsz > BTMaxItemSize(page))
397 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
398 errmsg("index row size %lu exceeds btree maximum, %lu",
399 (unsigned long) itemsz,
400 (unsigned long) BTMaxItemSize(page))));
403 * Determine exactly where new item will go.
406 newitemoff = afteritem + 1;
410 * If we will need to split the page to put the item here,
411 * check whether we can put the tuple somewhere to the right,
412 * instead. Keep scanning right until we
413 * (a) find a page with enough free space,
414 * (b) reach the last page where the tuple can legally go, or
415 * (c) get tired of searching.
416 * (c) is not flippant; it is important because if there are many
417 * pages' worth of equal keys, it's better to split one of the early
418 * pages than to scan all the way to the end of the run of equal keys
419 * on every insert. We implement "get tired" as a random choice,
420 * since stopping after scanning a fixed number of pages wouldn't work
421 * well (we'd never reach the right-hand side of previously split
422 * pages). Currently the probability of moving right is set at 0.99,
423 * which may seem too high to change the behavior much, but it does an
424 * excellent job of preventing O(N^2) behavior with many equal keys.
427 bool movedright = false;
429 while (PageGetFreeSpace(page) < itemsz &&
430 !P_RIGHTMOST(lpageop) &&
431 _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
432 random() > (MAX_RANDOM_VALUE / 100))
435 * step right to next non-dead page
437 * must write-lock that page before releasing write lock on
438 * current page; else someone else's _bt_check_unique scan
439 * could fail to see our insertion. write locks on
440 * intermediate dead pages won't do because we don't know when
441 * they will get de-linked from the tree.
443 Buffer rbuf = InvalidBuffer;
447 BlockNumber rblkno = lpageop->btpo_next;
449 rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
450 page = BufferGetPage(rbuf);
451 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
452 if (!P_IGNORE(lpageop))
454 if (P_RIGHTMOST(lpageop))
455 elog(ERROR, "fell off the end of \"%s\"",
456 RelationGetRelationName(rel));
458 _bt_relbuf(rel, buf);
464 * Now we are on the right page, so find the insert position. If
465 * we moved right at all, we know we should insert at the start of
466 * the page, else must find the position by searching.
469 newitemoff = P_FIRSTDATAKEY(lpageop);
471 newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
475 * Do we need to split the page to fit the item on it?
477 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
478 * so this comparison is correct even though we appear to be
479 * accounting only for the item and not for its line pointer.
481 if (PageGetFreeSpace(page) < itemsz)
483 bool is_root = P_ISROOT(lpageop);
484 bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
488 /* Choose the split point */
489 firstright = _bt_findsplitloc(rel, page,
493 /* split the buffer into left and right halves */
494 rbuf = _bt_split(rel, buf, firstright,
495 newitemoff, itemsz, btitem, newitemonleft,
496 &itup_off, &itup_blkno);
501 * + our target page has been split;
502 * + the original tuple has been inserted;
503 * + we have write locks on both the old (left half)
504 * and new (right half) buffers, after the split; and
505 * + we know the key we want to insert into the parent
506 * (it's the "high key" on the left child page).
508 * We're ready to do the parent insertion. We need to hold onto the
509 * locks for the child pages until we locate the parent, but we can
510 * release them before doing the actual insertion (see Lehman and Yao
511 * for the reasoning).
514 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
518 Buffer metabuf = InvalidBuffer;
520 BTMetaPageData *metad = NULL;
522 itup_off = newitemoff;
523 itup_blkno = BufferGetBlockNumber(buf);
526 * If we are doing this insert because we split a page that was
527 * the only one on its tree level, but was not the root, it may
528 * have been the "fast root". We need to ensure that the fast
529 * root link points at or above the current page. We can safely
530 * acquire a lock on the metapage here --- see comments for
535 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
536 metapg = BufferGetPage(metabuf);
537 metad = BTPageGetMeta(metapg);
539 if (metad->btm_fastlevel >= lpageop->btpo.level)
541 /* no update wanted */
542 _bt_relbuf(rel, metabuf);
543 metabuf = InvalidBuffer;
547 /* Do the update. No ereport(ERROR) until changes are logged */
548 START_CRIT_SECTION();
550 _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
552 if (BufferIsValid(metabuf))
554 metad->btm_fastroot = itup_blkno;
555 metad->btm_fastlevel = lpageop->btpo.level;
561 xl_btree_insert xlrec;
562 xl_btree_metadata xlmeta;
565 XLogRecData rdata[3];
566 XLogRecData *nextrdata;
567 BTItemData truncitem;
569 xlrec.target.node = rel->rd_node;
570 ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
572 rdata[0].buffer = InvalidBuffer;
573 rdata[0].data = (char *) &xlrec;
574 rdata[0].len = SizeOfBtreeInsert;
575 rdata[0].next = nextrdata = &(rdata[1]);
577 if (BufferIsValid(metabuf))
579 xlmeta.root = metad->btm_root;
580 xlmeta.level = metad->btm_level;
581 xlmeta.fastroot = metad->btm_fastroot;
582 xlmeta.fastlevel = metad->btm_fastlevel;
584 nextrdata->buffer = InvalidBuffer;
585 nextrdata->data = (char *) &xlmeta;
586 nextrdata->len = sizeof(xl_btree_metadata);
587 nextrdata->next = nextrdata + 1;
589 xlinfo = XLOG_BTREE_INSERT_META;
591 else if (P_ISLEAF(lpageop))
592 xlinfo = XLOG_BTREE_INSERT_LEAF;
594 xlinfo = XLOG_BTREE_INSERT_UPPER;
596 /* Read comments in _bt_pgaddtup */
597 if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
600 truncitem.bti_itup.t_info = sizeof(BTItemData);
601 nextrdata->data = (char *) &truncitem;
602 nextrdata->len = sizeof(BTItemData);
606 nextrdata->data = (char *) btitem;
607 nextrdata->len = IndexTupleDSize(btitem->bti_itup) +
608 (sizeof(BTItemData) - sizeof(IndexTupleData));
610 nextrdata->buffer = buf;
611 nextrdata->next = NULL;
613 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
615 if (BufferIsValid(metabuf))
617 PageSetLSN(metapg, recptr);
618 PageSetTLI(metapg, ThisTimeLineID);
621 PageSetLSN(page, recptr);
622 PageSetTLI(page, ThisTimeLineID);
627 /* Write out the updated page and release pin/lock */
628 if (BufferIsValid(metabuf))
629 _bt_wrtbuf(rel, metabuf);
631 _bt_wrtbuf(rel, buf);
634 /* by here, the new tuple is inserted at itup_blkno/itup_off */
635 res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
636 ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
642 * _bt_split() -- split a page in the btree.
644 * On entry, buf is the page to split, and is write-locked and pinned.
645 * firstright is the item index of the first item to be moved to the
646 * new right page. newitemoff etc. tell us about the new item that
647 * must be inserted along with the data from the old page.
649 * Returns the new right sibling of buf, pinned and write-locked.
650 * The pin and lock on buf are maintained. *itup_off and *itup_blkno
651 * are set to the exact location where newitem was inserted.
654 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
655 OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
657 OffsetNumber *itup_off, BlockNumber *itup_blkno)
663 BTPageOpaque ropaque,
666 Buffer sbuf = InvalidBuffer;
668 BTPageOpaque sopaque = NULL;
672 OffsetNumber leftoff,
677 rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
678 origpage = BufferGetPage(buf);
679 leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
680 rightpage = BufferGetPage(rbuf);
682 _bt_pageinit(leftpage, BufferGetPageSize(buf));
683 _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
685 /* init btree private data */
686 oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
687 lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
688 ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
690 /* if we're splitting this page, it won't be the root when we're done */
691 lopaque->btpo_flags = oopaque->btpo_flags;
692 lopaque->btpo_flags &= ~BTP_ROOT;
693 ropaque->btpo_flags = lopaque->btpo_flags;
694 lopaque->btpo_prev = oopaque->btpo_prev;
695 lopaque->btpo_next = BufferGetBlockNumber(rbuf);
696 ropaque->btpo_prev = BufferGetBlockNumber(buf);
697 ropaque->btpo_next = oopaque->btpo_next;
698 lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
701 * If the page we're splitting is not the rightmost page at its level
702 * in the tree, then the first entry on the page is the high key for
703 * the page. We need to copy that to the right half. Otherwise
704 * (meaning the rightmost page case), all the items on the right half
709 if (!P_RIGHTMOST(oopaque))
711 itemid = PageGetItemId(origpage, P_HIKEY);
712 itemsz = ItemIdGetLength(itemid);
713 item = (BTItem) PageGetItem(origpage, itemid);
714 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
715 LP_USED) == InvalidOffsetNumber)
716 elog(PANIC, "failed to add hikey to the right sibling");
717 rightoff = OffsetNumberNext(rightoff);
721 * The "high key" for the new left page will be the first key that's
722 * going to go into the new right page. This might be either the
723 * existing data item at position firstright, or the incoming tuple.
726 if (!newitemonleft && newitemoff == firstright)
728 /* incoming tuple will become first on right page */
734 /* existing item at firstright will become first on right page */
735 itemid = PageGetItemId(origpage, firstright);
736 itemsz = ItemIdGetLength(itemid);
737 item = (BTItem) PageGetItem(origpage, itemid);
739 if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
740 LP_USED) == InvalidOffsetNumber)
741 elog(PANIC, "failed to add hikey to the left sibling");
742 leftoff = OffsetNumberNext(leftoff);
745 * Now transfer all the data items to the appropriate page
747 maxoff = PageGetMaxOffsetNumber(origpage);
749 for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
751 itemid = PageGetItemId(origpage, i);
752 itemsz = ItemIdGetLength(itemid);
753 item = (BTItem) PageGetItem(origpage, itemid);
755 /* does new item belong before this one? */
760 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
763 *itup_blkno = BufferGetBlockNumber(buf);
764 leftoff = OffsetNumberNext(leftoff);
768 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
770 *itup_off = rightoff;
771 *itup_blkno = BufferGetBlockNumber(rbuf);
772 rightoff = OffsetNumberNext(rightoff);
776 /* decide which page to put it on */
779 _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
781 leftoff = OffsetNumberNext(leftoff);
785 _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
787 rightoff = OffsetNumberNext(rightoff);
791 /* cope with possibility that newitem goes at the end */
796 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
799 *itup_blkno = BufferGetBlockNumber(buf);
800 leftoff = OffsetNumberNext(leftoff);
804 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
806 *itup_off = rightoff;
807 *itup_blkno = BufferGetBlockNumber(rbuf);
808 rightoff = OffsetNumberNext(rightoff);
813 * We have to grab the right sibling (if any) and fix the prev pointer
814 * there. We are guaranteed that this is deadlock-free since no other
815 * writer will be holding a lock on that page and trying to move left,
816 * and all readers release locks on a page before trying to fetch its
820 if (!P_RIGHTMOST(ropaque))
822 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
823 spage = BufferGetPage(sbuf);
824 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
825 if (sopaque->btpo_prev != ropaque->btpo_prev)
826 elog(PANIC, "right sibling's left-link doesn't match");
830 * Right sibling is locked, new siblings are prepared, but original
831 * page is not updated yet. Log changes before continuing.
833 * NO EREPORT(ERROR) till right sibling is updated.
835 START_CRIT_SECTION();
837 if (!P_RIGHTMOST(ropaque))
838 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
843 xl_btree_split xlrec;
846 XLogRecData rdata[4];
848 xlrec.target.node = rel->rd_node;
849 ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off);
851 xlrec.otherblk = BufferGetBlockNumber(rbuf);
853 xlrec.otherblk = BufferGetBlockNumber(buf);
854 xlrec.leftblk = lopaque->btpo_prev;
855 xlrec.rightblk = ropaque->btpo_next;
856 xlrec.level = lopaque->btpo.level;
859 * Direct access to page is not good but faster - we should
860 * implement some new func in page API. Note we only store the
861 * tuples themselves, knowing that the item pointers are in the
862 * same order and can be reconstructed by scanning the tuples.
864 xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
865 ((PageHeader) leftpage)->pd_upper;
867 rdata[0].buffer = InvalidBuffer;
868 rdata[0].data = (char *) &xlrec;
869 rdata[0].len = SizeOfBtreeSplit;
870 rdata[0].next = &(rdata[1]);
872 rdata[1].buffer = InvalidBuffer;
873 rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
874 rdata[1].len = xlrec.leftlen;
875 rdata[1].next = &(rdata[2]);
877 rdata[2].buffer = InvalidBuffer;
878 rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
879 rdata[2].len = ((PageHeader) rightpage)->pd_special -
880 ((PageHeader) rightpage)->pd_upper;
881 rdata[2].next = NULL;
883 if (!P_RIGHTMOST(ropaque))
885 rdata[2].next = &(rdata[3]);
886 rdata[3].buffer = sbuf;
887 rdata[3].data = NULL;
889 rdata[3].next = NULL;
892 if (P_ISROOT(oopaque))
893 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
895 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
897 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
899 PageSetLSN(leftpage, recptr);
900 PageSetTLI(leftpage, ThisTimeLineID);
901 PageSetLSN(rightpage, recptr);
902 PageSetTLI(rightpage, ThisTimeLineID);
903 if (!P_RIGHTMOST(ropaque))
905 PageSetLSN(spage, recptr);
906 PageSetTLI(spage, ThisTimeLineID);
911 * By here, the original data page has been split into two new halves,
912 * and these are correct. The algorithm requires that the left page
913 * never move during a split, so we copy the new left page back on top
914 * of the original. Note that this is not a waste of time, since we
915 * also require (in the page management code) that the center of a
916 * page always be clean, and the most efficient way to guarantee this
917 * is just to compact the data by reinserting it into a new left page.
920 PageRestoreTempPage(leftpage, origpage);
924 /* write and release the old right sibling */
925 if (!P_RIGHTMOST(ropaque))
926 _bt_wrtbuf(rel, sbuf);
933 * _bt_findsplitloc() -- find an appropriate place to split a page.
935 * The idea here is to equalize the free space that will be on each split
936 * page, *after accounting for the inserted tuple*. (If we fail to account
937 * for it, we might find ourselves with too little room on the page that
938 * it needs to go into!)
940 * If the page is the rightmost page on its level, we instead try to arrange
941 * for twice as much free space on the right as on the left. In this way,
942 * when we are inserting successively increasing keys (consider sequences,
943 * timestamps, etc) we will end up with a tree whose pages are about 67% full,
944 * instead of the 50% full result that we'd get without this special case.
945 * (We could bias it even further to make the initially-loaded tree more full.
946 * But since the steady-state load for a btree is about 70%, we'd likely just
947 * be making more page-splitting work for ourselves later on, when we start
948 * seeing updates to existing tuples.)
950 * We are passed the intended insert position of the new tuple, expressed as
951 * the offsetnumber of the tuple it must go in front of. (This could be
952 * maxoff+1 if the tuple is to go at the end.)
954 * We return the index of the first existing tuple that should go on the
955 * righthand page, plus a boolean indicating whether the new tuple goes on
956 * the left or right page. The bool is necessary to disambiguate the case
957 * where firstright == newitemoff.
960 _bt_findsplitloc(Relation rel,
962 OffsetNumber newitemoff,
977 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
979 /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
980 newitemsz += sizeof(ItemIdData);
981 state.newitemsz = newitemsz;
982 state.is_leaf = P_ISLEAF(opaque);
983 state.is_rightmost = P_RIGHTMOST(opaque);
984 state.have_split = false;
986 /* Total free space available on a btree page, after fixed overhead */
987 leftspace = rightspace =
988 PageGetPageSize(page) - SizeOfPageHeaderData -
989 MAXALIGN(sizeof(BTPageOpaqueData));
992 * Finding the best possible split would require checking all the
993 * possible split points, because of the high-key and left-key special
994 * cases. That's probably more work than it's worth; instead, stop as
995 * soon as we find a "good-enough" split, where good-enough is defined
996 * as an imbalance in free space of no more than pagesize/16
997 * (arbitrary...) This should let us stop near the middle on most
998 * pages, instead of plowing to the end.
1000 goodenough = leftspace / 16;
1002 /* The right page will have the same high key as the old page */
1003 if (!P_RIGHTMOST(opaque))
1005 itemid = PageGetItemId(page, P_HIKEY);
1006 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1007 sizeof(ItemIdData));
1010 /* Count up total space in data items without actually scanning 'em */
1011 dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
1014 * Scan through the data items and calculate space usage for a split
1015 * at each possible position.
1017 dataitemstoleft = 0;
1018 maxoff = PageGetMaxOffsetNumber(page);
1020 for (offnum = P_FIRSTDATAKEY(opaque);
1022 offnum = OffsetNumberNext(offnum))
1028 itemid = PageGetItemId(page, offnum);
1029 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1032 * We have to allow for the current item becoming the high key of
1033 * the left page; therefore it counts against left space as well
1036 leftfree = leftspace - dataitemstoleft - (int) itemsz;
1037 rightfree = rightspace - (dataitemtotal - dataitemstoleft);
1040 * Will the new item go to left or right of split?
1042 if (offnum > newitemoff)
1043 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1045 else if (offnum < newitemoff)
1046 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1050 /* need to try it both ways! */
1051 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1053 /* here we are contemplating newitem as first on right */
1054 _bt_checksplitloc(&state, offnum, leftfree, rightfree,
1058 /* Abort scan once we find a good-enough choice */
1059 if (state.have_split && state.best_delta <= goodenough)
1062 dataitemstoleft += itemsz;
1066 * I believe it is not possible to fail to find a feasible split, but
1069 if (!state.have_split)
1070 elog(ERROR, "could not find a feasible split point for \"%s\"",
1071 RelationGetRelationName(rel));
1073 *newitemonleft = state.newitemonleft;
1074 return state.firstright;
1078 * Subroutine to analyze a particular possible split choice (ie, firstright
1079 * and newitemonleft settings), and record the best split so far in *state.
1082 _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1083 int leftfree, int rightfree,
1084 bool newitemonleft, Size firstrightitemsz)
1087 * Account for the new item on whichever side it is to be put.
1090 leftfree -= (int) state->newitemsz;
1092 rightfree -= (int) state->newitemsz;
1095 * If we are not on the leaf level, we will be able to discard the key
1096 * data from the first item that winds up on the right page.
1098 if (!state->is_leaf)
1099 rightfree += (int) firstrightitemsz -
1100 (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
1103 * If feasible split point, remember best delta.
1105 if (leftfree >= 0 && rightfree >= 0)
1109 if (state->is_rightmost)
1112 * On a rightmost page, try to equalize right free space with
1113 * twice the left free space. See comments for
1116 delta = (2 * leftfree) - rightfree;
1120 /* Otherwise, aim for equal free space on both sides */
1121 delta = leftfree - rightfree;
1126 if (!state->have_split || delta < state->best_delta)
1128 state->have_split = true;
1129 state->newitemonleft = newitemonleft;
1130 state->firstright = firstright;
1131 state->best_delta = delta;
1137 * _bt_insert_parent() -- Insert downlink into parent after a page split.
1139 * On entry, buf and rbuf are the left and right split pages, which we
1140 * still hold write locks on per the L&Y algorithm. We release the
1141 * write locks once we have write lock on the parent page. (Any sooner,
1142 * and it'd be possible for some other process to try to split or delete
1143 * one of these pages, and get confused because it cannot find the downlink.)
1145 * stack - stack showing how we got here. May be NULL in cases that don't
1146 * have to be efficient (concurrent ROOT split, WAL recovery)
1147 * is_root - we split the true root
1148 * is_only - we split a page alone on its level (might have been fast root)
1150 * This is exported so it can be called by nbtxlog.c.
1153 _bt_insert_parent(Relation rel,
1161 * Here we have to do something Lehman and Yao don't talk about: deal
1162 * with a root split and construction of a new root. If our stack is
1163 * empty then we have just split a node on what had been the root
1164 * level when we descended the tree. If it was still the root then we
1165 * perform a new-root construction. If it *wasn't* the root anymore,
1166 * search to find the next higher level that someone constructed
1167 * meanwhile, and find the right place to insert as for the normal
1170 * If we have to search for the parent level, we do so by re-descending
1171 * from the root. This is not super-efficient, but it's rare enough
1172 * not to matter. (This path is also taken when called from WAL
1173 * recovery --- we have no stack in that case.)
1179 Assert(stack == NULL);
1181 /* create a new root node and update the metapage */
1182 rootbuf = _bt_newroot(rel, buf, rbuf);
1183 /* release the split buffers */
1184 _bt_wrtbuf(rel, rootbuf);
1185 _bt_wrtbuf(rel, rbuf);
1186 _bt_wrtbuf(rel, buf);
1190 BlockNumber bknum = BufferGetBlockNumber(buf);
1191 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1192 Page page = BufferGetPage(buf);
1193 InsertIndexResult newres;
1195 BTStackData fakestack;
1201 BTPageOpaque lpageop;
1204 elog(DEBUG2, "concurrent ROOT page split");
1205 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1206 /* Find the leftmost page at the next level up */
1207 pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1208 /* Set up a phony stack entry pointing there */
1210 stack->bts_blkno = BufferGetBlockNumber(pbuf);
1211 stack->bts_offset = InvalidOffsetNumber;
1212 /* bts_btitem will be initialized below */
1213 stack->bts_parent = NULL;
1214 _bt_relbuf(rel, pbuf);
1217 /* get high key from left page == lowest key on new right page */
1218 ritem = (BTItem) PageGetItem(page,
1219 PageGetItemId(page, P_HIKEY));
1221 /* form an index tuple that points at the new right page */
1222 new_item = _bt_formitem(&(ritem->bti_itup));
1223 ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
1226 * Find the parent buffer and get the parent page.
1228 * Oops - if we were moved right then we need to change stack item!
1229 * We want to find parent pointing to where we are, right ? -
1232 ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
1235 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1237 /* Now we can write and unlock the children */
1238 _bt_wrtbuf(rel, rbuf);
1239 _bt_wrtbuf(rel, buf);
1241 /* Check for error only after writing children */
1242 if (pbuf == InvalidBuffer)
1243 elog(ERROR, "failed to re-find parent key in \"%s\"",
1244 RelationGetRelationName(rel));
1246 /* Recursively update the parent */
1247 newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
1248 0, NULL, new_item, stack->bts_offset,
1258 * _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1259 * we last looked at in the parent.
1261 * This is possible because we save the downlink from the parent item,
1262 * which is enough to uniquely identify it. Insertions into the parent
1263 * level could cause the item to move right; deletions could cause it
1264 * to move left, but not left of the page we previously found it in.
1266 * Adjusts bts_blkno & bts_offset if changed.
1268 * Returns InvalidBuffer if item not found (should not happen).
1271 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1276 blkno = stack->bts_blkno;
1277 start = stack->bts_offset;
1283 BTPageOpaque opaque;
1285 buf = _bt_getbuf(rel, blkno, access);
1286 page = BufferGetPage(buf);
1287 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1289 if (!P_IGNORE(opaque))
1291 OffsetNumber offnum,
1297 minoff = P_FIRSTDATAKEY(opaque);
1298 maxoff = PageGetMaxOffsetNumber(page);
1301 * start = InvalidOffsetNumber means "search the whole page".
1302 * We need this test anyway due to possibility that page has a
1303 * high key now when it didn't before.
1309 * Need this check too, to guard against possibility that page
1310 * split since we visited it originally.
1313 start = OffsetNumberNext(maxoff);
1316 * These loops will check every item on the page --- but in an
1317 * order that's attuned to the probability of where it
1318 * actually is. Scan to the right first, then to the left.
1320 for (offnum = start;
1322 offnum = OffsetNumberNext(offnum))
1324 itemid = PageGetItemId(page, offnum);
1325 item = (BTItem) PageGetItem(page, itemid);
1326 if (BTItemSame(item, &stack->bts_btitem))
1328 /* Return accurate pointer to where link is now */
1329 stack->bts_blkno = blkno;
1330 stack->bts_offset = offnum;
1335 for (offnum = OffsetNumberPrev(start);
1337 offnum = OffsetNumberPrev(offnum))
1339 itemid = PageGetItemId(page, offnum);
1340 item = (BTItem) PageGetItem(page, itemid);
1341 if (BTItemSame(item, &stack->bts_btitem))
1343 /* Return accurate pointer to where link is now */
1344 stack->bts_blkno = blkno;
1345 stack->bts_offset = offnum;
1352 * The item we're looking for moved right at least one page.
1354 if (P_RIGHTMOST(opaque))
1356 _bt_relbuf(rel, buf);
1357 return InvalidBuffer;
1359 blkno = opaque->btpo_next;
1360 start = InvalidOffsetNumber;
1361 _bt_relbuf(rel, buf);
1366 * _bt_newroot() -- Create a new root page for the index.
1368 * We've just split the old root page and need to create a new one.
1369 * In order to do this, we add a new root page to the file, then lock
1370 * the metadata page and update it. This is guaranteed to be deadlock-
1371 * free, because all readers release their locks on the metadata page
1372 * before trying to lock the root, and all writers lock the root before
1373 * trying to lock the metadata page. We have a write lock on the old
1374 * root page, so we have not introduced any cycles into the waits-for
1377 * On entry, lbuf (the old root) and rbuf (its new peer) are write-
1378 * locked. On exit, a new root page exists with entries for the
1379 * two new children, metapage is updated and unlocked/unpinned.
1380 * The new root buffer is returned to caller which has to unlock/unpin
1381 * lbuf, rbuf & rootbuf.
1384 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1392 BlockNumber rootblknum;
1393 BTPageOpaque rootopaque;
1400 BTMetaPageData *metad;
1402 lbkno = BufferGetBlockNumber(lbuf);
1403 rbkno = BufferGetBlockNumber(rbuf);
1404 lpage = BufferGetPage(lbuf);
1405 rpage = BufferGetPage(rbuf);
1407 /* get a new root page */
1408 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1409 rootpage = BufferGetPage(rootbuf);
1410 rootblknum = BufferGetBlockNumber(rootbuf);
1412 /* acquire lock on the metapage */
1413 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1414 metapg = BufferGetPage(metabuf);
1415 metad = BTPageGetMeta(metapg);
1417 /* NO EREPORT(ERROR) from here till newroot op is logged */
1418 START_CRIT_SECTION();
1420 /* set btree special data */
1421 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1422 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1423 rootopaque->btpo_flags = BTP_ROOT;
1424 rootopaque->btpo.level =
1425 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1427 /* update metapage data */
1428 metad->btm_root = rootblknum;
1429 metad->btm_level = rootopaque->btpo.level;
1430 metad->btm_fastroot = rootblknum;
1431 metad->btm_fastlevel = rootopaque->btpo.level;
1434 * Create downlink item for left page (old root). Since this will be
1435 * the first item in a non-leaf page, it implicitly has minus-infinity
1436 * key value, so we need not store any actual key in it.
1438 itemsz = sizeof(BTItemData);
1439 new_item = (BTItem) palloc(itemsz);
1440 new_item->bti_itup.t_info = itemsz;
1441 ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
1444 * Insert the left page pointer into the new root page. The root page
1445 * is the rightmost page on its level so there is no "high key" in it;
1446 * the two items will go into positions P_HIKEY and P_FIRSTKEY.
1448 if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1449 elog(PANIC, "failed to add leftkey to new root page");
1453 * Create downlink item for right page. The key for it is obtained
1454 * from the "high key" position in the left page.
1456 itemid = PageGetItemId(lpage, P_HIKEY);
1457 itemsz = ItemIdGetLength(itemid);
1458 item = (BTItem) PageGetItem(lpage, itemid);
1459 new_item = _bt_formitem(&(item->bti_itup));
1460 ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
1463 * insert the right page pointer into the new root page.
1465 if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1466 elog(PANIC, "failed to add rightkey to new root page");
1470 if (!rel->rd_istemp)
1472 xl_btree_newroot xlrec;
1474 XLogRecData rdata[2];
1476 xlrec.node = rel->rd_node;
1477 xlrec.rootblk = rootblknum;
1478 xlrec.level = metad->btm_level;
1480 rdata[0].buffer = InvalidBuffer;
1481 rdata[0].data = (char *) &xlrec;
1482 rdata[0].len = SizeOfBtreeNewroot;
1483 rdata[0].next = &(rdata[1]);
1486 * Direct access to page is not good but faster - we should
1487 * implement some new func in page API.
1489 rdata[1].buffer = InvalidBuffer;
1490 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1491 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1492 ((PageHeader) rootpage)->pd_upper;
1493 rdata[1].next = NULL;
1495 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1497 PageSetLSN(rootpage, recptr);
1498 PageSetTLI(rootpage, ThisTimeLineID);
1499 PageSetLSN(metapg, recptr);
1500 PageSetTLI(metapg, ThisTimeLineID);
1501 PageSetLSN(lpage, recptr);
1502 PageSetTLI(lpage, ThisTimeLineID);
1503 PageSetLSN(rpage, recptr);
1504 PageSetTLI(rpage, ThisTimeLineID);
1509 /* write and let go of metapage buffer */
1510 _bt_wrtbuf(rel, metabuf);
1516 * _bt_pgaddtup() -- add a tuple to a particular page in the index.
1518 * This routine adds the tuple to the page as requested. It does
1519 * not affect pin/lock status, but you'd better have a write lock
1520 * and pin on the target buffer! Don't forget to write and release
1521 * the buffer afterwards, either.
1523 * The main difference between this routine and a bare PageAddItem call
1524 * is that this code knows that the leftmost data item on a non-leaf
1525 * btree page doesn't need to have a key. Therefore, it strips such
1526 * items down to just the item header. CAUTION: this works ONLY if
1527 * we insert the items in order, so that the given itup_off does
1528 * represent the final position of the item!
1531 _bt_pgaddtup(Relation rel,
1535 OffsetNumber itup_off,
1538 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1539 BTItemData truncitem;
1541 if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1543 memcpy(&truncitem, btitem, sizeof(BTItemData));
1544 truncitem.bti_itup.t_info = sizeof(BTItemData);
1545 btitem = &truncitem;
1546 itemsize = sizeof(BTItemData);
1549 if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
1550 LP_USED) == InvalidOffsetNumber)
1551 elog(PANIC, "failed to add item to the %s for \"%s\"",
1552 where, RelationGetRelationName(rel));
1556 * _bt_isequal - used in _bt_doinsert in check for duplicates.
1558 * This is very similar to _bt_compare, except for NULL handling.
1559 * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1562 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1563 int keysz, ScanKey scankey)
1569 /* Better be comparing to a leaf item */
1570 Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1572 btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
1573 itup = &(btitem->bti_itup);
1575 for (i = 1; i <= keysz; i++)
1582 attno = scankey->sk_attno;
1584 datum = index_getattr(itup, attno, itupdesc, &isNull);
1586 /* NULLs are never equal to anything */
1587 if (isNull || (scankey->sk_flags & SK_ISNULL))
1590 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1592 scankey->sk_argument));
1600 /* if we get here, the keys are equal */