X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Faccess%2Fnbtree%2Fnbtinsert.c;h=ef68a7145fce5f5352196a4b1c751aee26e10859;hb=2ed5b87f96d473962ec5230fd820abfeaccb2069;hp=868a91ab3a5efa16c1294d7389693f7c31b26916;hpb=ee4ddac137b7c66e3bec6f74e3503236476cb16e;p=postgresql diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 868a91ab3a..ef68a7145f 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -3,12 +3,12 @@ * nbtinsert.c * Item insertion in Lehman and Yao btrees for Postgres. * - * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.120 2005/03/21 01:23:59 tgl Exp $ + * src/backend/access/nbtree/nbtinsert.c * *------------------------------------------------------------------------- */ @@ -17,15 +17,25 @@ #include "access/heapam.h" #include "access/nbtree.h" +#include "access/transam.h" +#include "access/xloginsert.h" #include "miscadmin.h" +#include "storage/lmgr.h" +#include "storage/predicate.h" +#include "utils/tqual.h" typedef struct { /* context data for _bt_checksplitloc */ Size newitemsz; /* size of new item to be inserted */ + int fillfactor; /* needed when splitting rightmost page */ bool is_leaf; /* T if splitting a leaf page */ bool is_rightmost; /* T if splitting a rightmost page */ + OffsetNumber newitemoff; /* where the new item is to be inserted */ + int leftspace; /* space available for items on left page */ + int rightspace; /* space available for items on right page */ + int olddataitemstotal; /* space taken by old items */ bool have_split; /* found a valid split? */ @@ -38,150 +48,208 @@ typedef struct static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); -static TransactionId _bt_check_unique(Relation rel, BTItem btitem, - Relation heapRel, Buffer buf, - ScanKey itup_scankey); -static void _bt_insertonpg(Relation rel, Buffer buf, +static TransactionId _bt_check_unique(Relation rel, IndexTuple itup, + Relation heapRel, Buffer buf, OffsetNumber offset, + ScanKey itup_scankey, + IndexUniqueCheck checkUnique, bool *is_unique); +static void _bt_findinsertloc(Relation rel, + Buffer *bufptr, + OffsetNumber *offsetptr, + int keysz, + ScanKey scankey, + IndexTuple newtup, + BTStack stack, + Relation heapRel); +static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf, BTStack stack, - int keysz, ScanKey scankey, - BTItem btitem, - OffsetNumber afteritem, + IndexTuple itup, + OffsetNumber newitemoff, bool split_only_page); -static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, - OffsetNumber newitemoff, Size newitemsz, - BTItem newitem, bool newitemonleft, - OffsetNumber *itup_off, BlockNumber *itup_blkno); +static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf, + OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz, + IndexTuple newitem, bool newitemonleft); +static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf, + BTStack stack, bool is_root, bool is_only); static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber newitemoff, Size newitemsz, bool *newitemonleft); -static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, - int leftfree, int rightfree, - bool newitemonleft, Size firstrightitemsz); -static void _bt_pgaddtup(Relation rel, Page page, - Size itemsize, BTItem btitem, - OffsetNumber itup_off, const char *where); +static void _bt_checksplitloc(FindSplitData *state, + OffsetNumber firstoldonright, bool newitemonleft, + int dataitemstoleft, Size firstoldonrightsz); +static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, + OffsetNumber itup_off); static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); +static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel); /* - * _bt_doinsert() -- Handle insertion of a single btitem in the tree. + * _bt_doinsert() -- Handle insertion of a single index tuple in the tree. + * + * This routine is called by the public interface routine, btinsert. + * By here, itup is filled in, including the TID. * - * This routine is called by the public interface routines, btbuild - * and btinsert. By here, btitem is filled in, including the TID. + * If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this + * will allow duplicates. Otherwise (UNIQUE_CHECK_YES or + * UNIQUE_CHECK_EXISTING) it will throw error for a duplicate. + * For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and + * don't actually insert. + * + * The result value is only significant for UNIQUE_CHECK_PARTIAL: + * it must be TRUE if the entry is known unique, else FALSE. + * (In the current implementation we'll also return TRUE after a + * successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but + * that's just a coding artifact.) */ -void -_bt_doinsert(Relation rel, BTItem btitem, - bool index_is_unique, Relation heapRel) +bool +_bt_doinsert(Relation rel, IndexTuple itup, + IndexUniqueCheck checkUnique, Relation heapRel) { - IndexTuple itup = &(btitem->bti_itup); + bool is_unique = false; int natts = rel->rd_rel->relnatts; ScanKey itup_scankey; BTStack stack; Buffer buf; + OffsetNumber offset; - /* we need a scan key to do our search, so build one */ + /* we need an insertion scan key to do our search, so build one */ itup_scankey = _bt_mkscankey(rel, itup); top: /* find the first page containing this key */ stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE); + offset = InvalidOffsetNumber; + /* trade in our read lock for a write lock */ LockBuffer(buf, BUFFER_LOCK_UNLOCK); LockBuffer(buf, BT_WRITE); /* * If the page was split between the time that we surrendered our read - * lock and acquired our write lock, then this page may no longer be - * the right place for the key we want to insert. In this case, we - * need to move right in the tree. See Lehman and Yao for an - * excruciatingly precise description. + * lock and acquired our write lock, then this page may no longer be the + * right place for the key we want to insert. In this case, we need to + * move right in the tree. See Lehman and Yao for an excruciatingly + * precise description. */ - buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE); + buf = _bt_moveright(rel, buf, natts, itup_scankey, false, + true, stack, BT_WRITE); /* - * If we're not allowing duplicates, make sure the key isn't already - * in the index. + * If we're not allowing duplicates, make sure the key isn't already in + * the index. * - * NOTE: obviously, _bt_check_unique can only detect keys that are - * already in the index; so it cannot defend against concurrent - * insertions of the same key. We protect against that by means of - * holding a write lock on the target page. Any other would-be - * inserter of the same key must acquire a write lock on the same - * target page, so only one would-be inserter can be making the check - * at one time. Furthermore, once we are past the check we hold write - * locks continuously until we have performed our insertion, so no - * later inserter can fail to see our insertion. (This requires some - * care in _bt_insertonpg.) + * NOTE: obviously, _bt_check_unique can only detect keys that are already + * in the index; so it cannot defend against concurrent insertions of the + * same key. We protect against that by means of holding a write lock on + * the target page. Any other would-be inserter of the same key must + * acquire a write lock on the same target page, so only one would-be + * inserter can be making the check at one time. Furthermore, once we are + * past the check we hold write locks continuously until we have performed + * our insertion, so no later inserter can fail to see our insertion. + * (This requires some care in _bt_insertonpg.) * * If we must wait for another xact, we release the lock while waiting, * and then must start over completely. + * + * For a partial uniqueness check, we don't wait for the other xact. Just + * let the tuple in and return false for possibly non-unique, or true for + * definitely unique. */ - if (index_is_unique) + if (checkUnique != UNIQUE_CHECK_NO) { TransactionId xwait; - xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey); + offset = _bt_binsrch(rel, buf, natts, itup_scankey, false); + xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey, + checkUnique, &is_unique); if (TransactionIdIsValid(xwait)) { /* Have to wait for the other guy ... */ _bt_relbuf(rel, buf); - XactLockTableWait(xwait); + XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex); /* start over... */ _bt_freestack(stack); goto top; } } - /* do the insertion */ - _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0, false); + if (checkUnique != UNIQUE_CHECK_EXISTING) + { + /* + * The only conflict predicate locking cares about for indexes is when + * an index tuple insert conflicts with an existing lock. Since the + * actual location of the insert is hard to predict because of the + * random search used to prevent O(N^2) performance when there are + * many duplicate entries, we can just use the "first valid" page. + */ + CheckForSerializableConflictIn(rel, NULL, buf); + /* do the insertion */ + _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, + stack, heapRel); + _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false); + } + else + { + /* just release the buffer */ + _bt_relbuf(rel, buf); + } /* be tidy */ _bt_freestack(stack); _bt_freeskey(itup_scankey); + + return is_unique; } /* * _bt_check_unique() -- Check for violation of unique index constraint * + * offset points to the first possible item that could conflict. It can + * also point to end-of-page, which means that the first tuple to check + * is the first tuple on the next page. + * * Returns InvalidTransactionId if there is no conflict, else an xact ID - * we must wait for to see if it commits a conflicting tuple. If an actual + * we must wait for to see if it commits a conflicting tuple. If an actual * conflict is detected, no return --- just ereport(). + * + * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return + * InvalidTransactionId because we don't want to wait. In this case we + * set *is_unique to false if there is a potential conflict, and the + * core code must redo the uniqueness check later. */ static TransactionId -_bt_check_unique(Relation rel, BTItem btitem, Relation heapRel, - Buffer buf, ScanKey itup_scankey) +_bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, + Buffer buf, OffsetNumber offset, ScanKey itup_scankey, + IndexUniqueCheck checkUnique, bool *is_unique) { TupleDesc itupdesc = RelationGetDescr(rel); int natts = rel->rd_rel->relnatts; - OffsetNumber offset, - maxoff; + SnapshotData SnapshotDirty; + OffsetNumber maxoff; Page page; BTPageOpaque opaque; Buffer nbuf = InvalidBuffer; + bool found = false; + + /* Assume unique until we find a duplicate */ + *is_unique = true; + + InitDirtySnapshot(SnapshotDirty); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); maxoff = PageGetMaxOffsetNumber(page); - /* - * Find first item >= proposed new item. Note we could also get a - * pointer to end-of-page here. - */ - offset = _bt_binsrch(rel, buf, natts, itup_scankey, false); - /* * Scan over all equal tuples, looking for live conflicts. */ for (;;) { - HeapTupleData htup; - Buffer hbuffer; ItemId curitemid; - BTItem cbti; + IndexTuple curitup; BlockNumber nblkno; /* @@ -196,44 +264,77 @@ _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel, * We can skip items that are marked killed. * * Formerly, we applied _bt_isequal() before checking the kill - * flag, so as to fall out of the item loop as soon as - * possible. However, in the presence of heavy update activity - * an index may contain many killed items with the same key; - * running _bt_isequal() on each killed item gets expensive. - * Furthermore it is likely that the non-killed version of - * each key appears first, so that we didn't actually get to - * exit any sooner anyway. So now we just advance over killed - * items as quickly as we can. We only apply _bt_isequal() - * when we get to a non-killed item or the end of the page. + * flag, so as to fall out of the item loop as soon as possible. + * However, in the presence of heavy update activity an index may + * contain many killed items with the same key; running + * _bt_isequal() on each killed item gets expensive. Furthermore + * it is likely that the non-killed version of each key appears + * first, so that we didn't actually get to exit any sooner + * anyway. So now we just advance over killed items as quickly as + * we can. We only apply _bt_isequal() when we get to a non-killed + * item or the end of the page. */ - if (!ItemIdDeleted(curitemid)) + if (!ItemIdIsDead(curitemid)) { + ItemPointerData htid; + bool all_dead; + /* - * _bt_compare returns 0 for (1,NULL) and (1,NULL) - - * this's how we handling NULLs - and so we must not use - * _bt_compare in real comparison, but only for - * ordering/finding items on pages. - vadim 03/24/97 + * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's + * how we handling NULLs - and so we must not use _bt_compare + * in real comparison, but only for ordering/finding items on + * pages. - vadim 03/24/97 */ if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey)) break; /* we're past all the equal tuples */ /* okay, we gotta fetch the heap tuple ... */ - cbti = (BTItem) PageGetItem(page, curitemid); - htup.t_self = cbti->bti_itup.t_tid; - if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer, - true, NULL)) + curitup = (IndexTuple) PageGetItem(page, curitemid); + htid = curitup->t_tid; + + /* + * If we are doing a recheck, we expect to find the tuple we + * are rechecking. It's not a duplicate, but we have to keep + * scanning. + */ + if (checkUnique == UNIQUE_CHECK_EXISTING && + ItemPointerCompare(&htid, &itup->t_tid) == 0) { - /* it is a duplicate */ - TransactionId xwait = - (TransactionIdIsValid(SnapshotDirty->xmin)) ? - SnapshotDirty->xmin : SnapshotDirty->xmax; + found = true; + } - ReleaseBuffer(hbuffer); + /* + * We check the whole HOT-chain to see if there is any tuple + * that satisfies SnapshotDirty. This is necessary because we + * have just a single index entry for the entire chain. + */ + else if (heap_hot_search(&htid, heapRel, &SnapshotDirty, + &all_dead)) + { + TransactionId xwait; + + /* + * It is a duplicate. If we are only doing a partial + * check, then don't bother checking if the tuple is being + * updated in another transaction. Just return the fact + * that it is a potential conflict and leave the full + * check till later. + */ + if (checkUnique == UNIQUE_CHECK_PARTIAL) + { + if (nbuf != InvalidBuffer) + _bt_relbuf(rel, nbuf); + *is_unique = false; + return InvalidTransactionId; + } /* * If this tuple is being updated by other transaction * then we have to wait for its commit/abort. */ + xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ? + SnapshotDirty.xmin : SnapshotDirty.xmax; + if (TransactionIdIsValid(xwait)) { if (nbuf != InvalidBuffer) @@ -243,30 +344,88 @@ _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel, } /* - * Otherwise we have a definite conflict. + * Otherwise we have a definite conflict. But before + * complaining, look to see if the tuple we want to insert + * is itself now committed dead --- if so, don't complain. + * This is a waste of time in normal scenarios but we must + * do it to support CREATE INDEX CONCURRENTLY. + * + * We must follow HOT-chains here because during + * concurrent index build, we insert the root TID though + * the actual tuple may be somewhere in the HOT-chain. + * While following the chain we might not stop at the + * exact tuple which triggered the insert, but that's OK + * because if we find a live tuple anywhere in this chain, + * we have a unique key conflict. The other live tuple is + * not part of this chain because it had a different index + * entry. */ - ereport(ERROR, - (errcode(ERRCODE_UNIQUE_VIOLATION), - errmsg("duplicate key violates unique constraint \"%s\"", - RelationGetRelationName(rel)))); - } - else if (htup.t_data != NULL) - { + htid = itup->t_tid; + if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL)) + { + /* Normal case --- it's still live */ + } + else + { + /* + * It's been deleted, so no error, and no need to + * continue searching + */ + break; + } + /* - * Hmm, if we can't see the tuple, maybe it can be - * marked killed. This logic should match - * index_getnext and btgettuple. + * This is a definite conflict. Break the tuple down into + * datums and report the error. But first, make sure we + * release the buffer locks we're holding --- + * BuildIndexValueDescription could make catalog accesses, + * which in the worst case might touch this same index and + * cause deadlocks. */ - LockBuffer(hbuffer, BUFFER_LOCK_SHARE); - if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin, - hbuffer) == HEAPTUPLE_DEAD) + if (nbuf != InvalidBuffer) + _bt_relbuf(rel, nbuf); + _bt_relbuf(rel, buf); + { - curitemid->lp_flags |= LP_DELETE; - SetBufferCommitInfoNeedsSave(buf); + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + char *key_desc; + + index_deform_tuple(itup, RelationGetDescr(rel), + values, isnull); + + key_desc = BuildIndexValueDescription(rel, values, + isnull); + + ereport(ERROR, + (errcode(ERRCODE_UNIQUE_VIOLATION), + errmsg("duplicate key value violates unique constraint \"%s\"", + RelationGetRelationName(rel)), + key_desc ? errdetail("Key %s already exists.", + key_desc) : 0, + errtableconstraint(heapRel, + RelationGetRelationName(rel)))); } - LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK); } - ReleaseBuffer(hbuffer); + else if (all_dead) + { + /* + * The conflicting tuple (or whole HOT chain) is dead to + * everyone, so we may as well mark the index entry + * killed. + */ + ItemIdMarkDead(curitemid); + opaque->btpo_flags |= BTP_HAS_GARBAGE; + + /* + * Mark buffer with a dirty hint, since state is not + * crucial. Be sure to mark the proper buffer dirty. + */ + if (nbuf != InvalidBuffer) + MarkBufferDirtyHint(nbuf, true); + else + MarkBufferDirtyHint(buf, true); + } } } @@ -293,7 +452,7 @@ _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel, if (!P_IGNORE(opaque)) break; if (P_RIGHTMOST(opaque)) - elog(ERROR, "fell off the end of \"%s\"", + elog(ERROR, "fell off the end of index \"%s\"", RelationGetRelationName(rel)); } maxoff = PageGetMaxOffsetNumber(page); @@ -301,18 +460,221 @@ _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel, } } + /* + * If we are doing a recheck then we should have found the tuple we are + * checking. Otherwise there's something very wrong --- probably, the + * index is on a non-immutable expression. + */ + if (checkUnique == UNIQUE_CHECK_EXISTING && !found) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("failed to re-find tuple within index \"%s\"", + RelationGetRelationName(rel)), + errhint("This may be because of a non-immutable index expression."), + errtableconstraint(heapRel, + RelationGetRelationName(rel)))); + if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf); return InvalidTransactionId; } + +/* + * _bt_findinsertloc() -- Finds an insert location for a tuple + * + * If the new key is equal to one or more existing keys, we can + * legitimately place it anywhere in the series of equal keys --- in fact, + * if the new key is equal to the page's "high key" we can place it on + * the next page. If it is equal to the high key, and there's not room + * to insert the new tuple on the current page without splitting, then + * we can move right hoping to find more free space and avoid a split. + * (We should not move right indefinitely, however, since that leads to + * O(N^2) insertion behavior in the presence of many equal keys.) + * Once we have chosen the page to put the key on, we'll insert it before + * any existing equal keys because of the way _bt_binsrch() works. + * + * If there's not enough room in the space, we try to make room by + * removing any LP_DEAD tuples. + * + * On entry, *bufptr and *offsetptr point to the first legal position + * where the new tuple could be inserted. The caller should hold an + * exclusive lock on *bufptr. *offsetptr can also be set to + * InvalidOffsetNumber, in which case the function will search for the + * right location within the page if needed. On exit, they point to the + * chosen insert location. If _bt_findinsertloc decides to move right, + * the lock and pin on the original page will be released and the new + * page returned to the caller is exclusively locked instead. + * + * newtup is the new tuple we're inserting, and scankey is an insertion + * type scan key for it. + */ +static void +_bt_findinsertloc(Relation rel, + Buffer *bufptr, + OffsetNumber *offsetptr, + int keysz, + ScanKey scankey, + IndexTuple newtup, + BTStack stack, + Relation heapRel) +{ + Buffer buf = *bufptr; + Page page = BufferGetPage(buf); + Size itemsz; + BTPageOpaque lpageop; + bool movedright, + vacuumed; + OffsetNumber newitemoff; + OffsetNumber firstlegaloff = *offsetptr; + + lpageop = (BTPageOpaque) PageGetSpecialPointer(page); + + itemsz = IndexTupleDSize(*newtup); + itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we + * need to be consistent */ + + /* + * Check whether the item can fit on a btree page at all. (Eventually, we + * ought to try to apply TOAST methods if not.) We actually need to be + * able to fit three items on every page, so restrict any one item to 1/3 + * the per-page available space. Note that at this point, itemsz doesn't + * include the ItemId. + * + * NOTE: if you change this, see also the similar code in _bt_buildadd(). + */ + if (itemsz > BTMaxItemSize(page)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("index row size %zu exceeds maximum %zu for index \"%s\"", + itemsz, BTMaxItemSize(page), + RelationGetRelationName(rel)), + errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n" + "Consider a function index of an MD5 hash of the value, " + "or use full text indexing."), + errtableconstraint(heapRel, + RelationGetRelationName(rel)))); + + /*---------- + * If we will need to split the page to put the item on this page, + * check whether we can put the tuple somewhere to the right, + * instead. Keep scanning right until we + * (a) find a page with enough free space, + * (b) reach the last page where the tuple can legally go, or + * (c) get tired of searching. + * (c) is not flippant; it is important because if there are many + * pages' worth of equal keys, it's better to split one of the early + * pages than to scan all the way to the end of the run of equal keys + * on every insert. We implement "get tired" as a random choice, + * since stopping after scanning a fixed number of pages wouldn't work + * well (we'd never reach the right-hand side of previously split + * pages). Currently the probability of moving right is set at 0.99, + * which may seem too high to change the behavior much, but it does an + * excellent job of preventing O(N^2) behavior with many equal keys. + *---------- + */ + movedright = false; + vacuumed = false; + while (PageGetFreeSpace(page) < itemsz) + { + Buffer rbuf; + BlockNumber rblkno; + + /* + * before considering moving right, see if we can obtain enough space + * by erasing LP_DEAD items + */ + if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop)) + { + _bt_vacuum_one_page(rel, buf, heapRel); + + /* + * remember that we vacuumed this page, because that makes the + * hint supplied by the caller invalid + */ + vacuumed = true; + + if (PageGetFreeSpace(page) >= itemsz) + break; /* OK, now we have enough space */ + } + + /* + * nope, so check conditions (b) and (c) enumerated above + */ + if (P_RIGHTMOST(lpageop) || + _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 || + random() <= (MAX_RANDOM_VALUE / 100)) + break; + + /* + * step right to next non-dead page + * + * must write-lock that page before releasing write lock on current + * page; else someone else's _bt_check_unique scan could fail to see + * our insertion. write locks on intermediate dead pages won't do + * because we don't know when they will get de-linked from the tree. + */ + rbuf = InvalidBuffer; + + rblkno = lpageop->btpo_next; + for (;;) + { + rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE); + page = BufferGetPage(rbuf); + lpageop = (BTPageOpaque) PageGetSpecialPointer(page); + + /* + * If this page was incompletely split, finish the split now. We + * do this while holding a lock on the left sibling, which is not + * good because finishing the split could be a fairly lengthy + * operation. But this should happen very seldom. + */ + if (P_INCOMPLETE_SPLIT(lpageop)) + { + _bt_finish_split(rel, rbuf, stack); + rbuf = InvalidBuffer; + continue; + } + + if (!P_IGNORE(lpageop)) + break; + if (P_RIGHTMOST(lpageop)) + elog(ERROR, "fell off the end of index \"%s\"", + RelationGetRelationName(rel)); + + rblkno = lpageop->btpo_next; + } + _bt_relbuf(rel, buf); + buf = rbuf; + movedright = true; + vacuumed = false; + } + + /* + * Now we are on the right page, so find the insert position. If we moved + * right at all, we know we should insert at the start of the page. If we + * didn't move right, we can use the firstlegaloff hint if the caller + * supplied one, unless we vacuumed the page which might have moved tuples + * around making the hint invalid. If we didn't move right or can't use + * the hint, find the position by searching. + */ + if (movedright) + newitemoff = P_FIRSTDATAKEY(lpageop); + else if (firstlegaloff != InvalidOffsetNumber && !vacuumed) + newitemoff = firstlegaloff; + else + newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false); + + *bufptr = buf; + *offsetptr = newitemoff; +} + /*---------- * _bt_insertonpg() -- Insert a tuple on a particular page in the index. * * This recursive procedure does the following things: * - * + finds the right place to insert the tuple. * + if necessary, splits the target page (making sure that the * split is equitable as far as post-insert free space goes). * + inserts the tuple. @@ -323,25 +685,13 @@ _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel, * child page on the parent. * + updates the metapage if a true root or fast root is split. * - * On entry, we must have the right buffer on which to do the - * insertion, and the buffer must be pinned and locked. On return, - * we will have dropped both the pin and the write lock on the buffer. + * On entry, we must have the correct buffer in which to do the + * insertion, and the buffer must be pinned and write-locked. On return, + * we will have dropped both the pin and the lock on the buffer. * - * If 'afteritem' is >0 then the new tuple must be inserted after the - * existing item of that number, noplace else. If 'afteritem' is 0 - * then the procedure finds the exact spot to insert it by searching. - * (keysz and scankey parameters are used ONLY if afteritem == 0.) - * - * NOTE: if the new key is equal to one or more existing keys, we can - * legitimately place it anywhere in the series of equal keys --- in fact, - * if the new key is equal to the page's "high key" we can place it on - * the next page. If it is equal to the high key, and there's not room - * to insert the new tuple on the current page without splitting, then - * we can move right hoping to find more free space and avoid a split. - * (We should not move right indefinitely, however, since that leads to - * O(N^2) insertion behavior in the presence of many equal keys.) - * Once we have chosen the page to put the key on, we'll insert it before - * any existing equal keys because of the way _bt_binsrch() works. + * When inserting to a non-leaf page, 'cbuf' is the left-sibling of the + * page we're inserting the downlink for. This function will clear the + * INCOMPLETE_SPLIT flag on it, and release the buffer. * * The locking interactions in this code are critical. You should * grok Lehman and Yao's paper before making any changes. In addition, @@ -356,122 +706,38 @@ _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel, static void _bt_insertonpg(Relation rel, Buffer buf, + Buffer cbuf, BTStack stack, - int keysz, - ScanKey scankey, - BTItem btitem, - OffsetNumber afteritem, + IndexTuple itup, + OffsetNumber newitemoff, bool split_only_page) { Page page; BTPageOpaque lpageop; - OffsetNumber itup_off; - BlockNumber itup_blkno; - OffsetNumber newitemoff; OffsetNumber firstright = InvalidOffsetNumber; Size itemsz; page = BufferGetPage(buf); lpageop = (BTPageOpaque) PageGetSpecialPointer(page); - itemsz = IndexTupleDSize(btitem->bti_itup) - + (sizeof(BTItemData) - sizeof(IndexTupleData)); + /* child buffer must be given iff inserting on an internal page */ + Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf)); - itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but - * we need to be consistent */ + /* The caller should've finished any incomplete splits already. */ + if (P_INCOMPLETE_SPLIT(lpageop)) + elog(ERROR, "cannot insert to incompletely split page %u", + BufferGetBlockNumber(buf)); - /* - * Check whether the item can fit on a btree page at all. (Eventually, - * we ought to try to apply TOAST methods if not.) We actually need to - * be able to fit three items on every page, so restrict any one item - * to 1/3 the per-page available space. Note that at this point, - * itemsz doesn't include the ItemId. - */ - if (itemsz > BTMaxItemSize(page)) - ereport(ERROR, - (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("index row size %lu exceeds btree maximum, %lu", - (unsigned long) itemsz, - (unsigned long) BTMaxItemSize(page)))); - - /* - * Determine exactly where new item will go. - */ - if (afteritem > 0) - newitemoff = afteritem + 1; - else - { - /*---------- - * If we will need to split the page to put the item here, - * check whether we can put the tuple somewhere to the right, - * instead. Keep scanning right until we - * (a) find a page with enough free space, - * (b) reach the last page where the tuple can legally go, or - * (c) get tired of searching. - * (c) is not flippant; it is important because if there are many - * pages' worth of equal keys, it's better to split one of the early - * pages than to scan all the way to the end of the run of equal keys - * on every insert. We implement "get tired" as a random choice, - * since stopping after scanning a fixed number of pages wouldn't work - * well (we'd never reach the right-hand side of previously split - * pages). Currently the probability of moving right is set at 0.99, - * which may seem too high to change the behavior much, but it does an - * excellent job of preventing O(N^2) behavior with many equal keys. - *---------- - */ - bool movedright = false; - - while (PageGetFreeSpace(page) < itemsz && - !P_RIGHTMOST(lpageop) && - _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 && - random() > (MAX_RANDOM_VALUE / 100)) - { - /* - * step right to next non-dead page - * - * must write-lock that page before releasing write lock on - * current page; else someone else's _bt_check_unique scan - * could fail to see our insertion. write locks on - * intermediate dead pages won't do because we don't know when - * they will get de-linked from the tree. - */ - Buffer rbuf = InvalidBuffer; - - for (;;) - { - BlockNumber rblkno = lpageop->btpo_next; - - rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE); - page = BufferGetPage(rbuf); - lpageop = (BTPageOpaque) PageGetSpecialPointer(page); - if (!P_IGNORE(lpageop)) - break; - if (P_RIGHTMOST(lpageop)) - elog(ERROR, "fell off the end of \"%s\"", - RelationGetRelationName(rel)); - } - _bt_relbuf(rel, buf); - buf = rbuf; - movedright = true; - } - - /* - * Now we are on the right page, so find the insert position. If - * we moved right at all, we know we should insert at the start of - * the page, else must find the position by searching. - */ - if (movedright) - newitemoff = P_FIRSTDATAKEY(lpageop); - else - newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false); - } + itemsz = IndexTupleDSize(*itup); + itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we + * need to be consistent */ /* * Do we need to split the page to fit the item on it? * * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result, - * so this comparison is correct even though we appear to be - * accounting only for the item and not for its line pointer. + * so this comparison is correct even though we appear to be accounting + * only for the item and not for its line pointer. */ if (PageGetFreeSpace(page) < itemsz) { @@ -486,9 +752,11 @@ _bt_insertonpg(Relation rel, &newitemonleft); /* split the buffer into left and right halves */ - rbuf = _bt_split(rel, buf, firstright, - newitemoff, itemsz, btitem, newitemonleft, - &itup_off, &itup_blkno); + rbuf = _bt_split(rel, buf, cbuf, firstright, + newitemoff, itemsz, itup, newitemonleft); + PredicateLockPageSplit(rel, + BufferGetBlockNumber(buf), + BufferGetBlockNumber(rbuf)); /*---------- * By here, @@ -513,20 +781,23 @@ _bt_insertonpg(Relation rel, Buffer metabuf = InvalidBuffer; Page metapg = NULL; BTMetaPageData *metad = NULL; + OffsetNumber itup_off; + BlockNumber itup_blkno; itup_off = newitemoff; itup_blkno = BufferGetBlockNumber(buf); /* - * If we are doing this insert because we split a page that was - * the only one on its tree level, but was not the root, it may - * have been the "fast root". We need to ensure that the fast - * root link points at or above the current page. We can safely - * acquire a lock on the metapage here --- see comments for - * _bt_newroot(). + * If we are doing this insert because we split a page that was the + * only one on its tree level, but was not the root, it may have been + * the "fast root". We need to ensure that the fast root link points + * at or above the current page. We can safely acquire a lock on the + * metapage here --- see comments for _bt_newroot(). */ if (split_only_page) { + Assert(!P_ISLEAF(lpageop)); + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); metapg = BufferGetPage(metabuf); metad = BTPageGetMeta(metapg); @@ -542,32 +813,56 @@ _bt_insertonpg(Relation rel, /* Do the update. No ereport(ERROR) until changes are logged */ START_CRIT_SECTION(); - _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page"); + if (!_bt_pgaddtup(page, itemsz, itup, newitemoff)) + elog(PANIC, "failed to add new item to block %u in index \"%s\"", + itup_blkno, RelationGetRelationName(rel)); + + MarkBufferDirty(buf); if (BufferIsValid(metabuf)) { metad->btm_fastroot = itup_blkno; metad->btm_fastlevel = lpageop->btpo.level; + MarkBufferDirty(metabuf); + } + + /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */ + if (BufferIsValid(cbuf)) + { + Page cpage = BufferGetPage(cbuf); + BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage); + + Assert(P_INCOMPLETE_SPLIT(cpageop)); + cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT; + MarkBufferDirty(cbuf); } /* XLOG stuff */ - if (!rel->rd_istemp) + if (RelationNeedsWAL(rel)) { xl_btree_insert xlrec; xl_btree_metadata xlmeta; uint8 xlinfo; XLogRecPtr recptr; - XLogRecData rdata[3]; - XLogRecData *nextrdata; - BTItemData truncitem; + IndexTupleData trunctuple; + + xlrec.offnum = itup_off; - xlrec.target.node = rel->rd_node; - ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert); + + if (P_ISLEAF(lpageop)) + xlinfo = XLOG_BTREE_INSERT_LEAF; + else + { + /* + * Register the left child whose INCOMPLETE_SPLIT flag was + * cleared. + */ + XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD); - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeInsert; - rdata[0].next = nextrdata = &(rdata[1]); + xlinfo = XLOG_BTREE_INSERT_UPPER; + } if (BufferIsValid(metabuf)) { @@ -576,79 +871,75 @@ _bt_insertonpg(Relation rel, xlmeta.fastroot = metad->btm_fastroot; xlmeta.fastlevel = metad->btm_fastlevel; - nextrdata->buffer = InvalidBuffer; - nextrdata->data = (char *) &xlmeta; - nextrdata->len = sizeof(xl_btree_metadata); - nextrdata->next = nextrdata + 1; - nextrdata++; + XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT); + XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata)); + xlinfo = XLOG_BTREE_INSERT_META; } - else if (P_ISLEAF(lpageop)) - xlinfo = XLOG_BTREE_INSERT_LEAF; - else - xlinfo = XLOG_BTREE_INSERT_UPPER; /* Read comments in _bt_pgaddtup */ + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop)) { - truncitem = *btitem; - truncitem.bti_itup.t_info = sizeof(BTItemData); - nextrdata->data = (char *) &truncitem; - nextrdata->len = sizeof(BTItemData); + trunctuple = *itup; + trunctuple.t_info = sizeof(IndexTupleData); + XLogRegisterBufData(0, (char *) &trunctuple, + sizeof(IndexTupleData)); } else - { - nextrdata->data = (char *) btitem; - nextrdata->len = IndexTupleDSize(btitem->bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - } - nextrdata->buffer = buf; - nextrdata->next = NULL; + XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup)); - recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata); + recptr = XLogInsert(RM_BTREE_ID, xlinfo); if (BufferIsValid(metabuf)) { PageSetLSN(metapg, recptr); - PageSetTLI(metapg, ThisTimeLineID); + } + if (BufferIsValid(cbuf)) + { + PageSetLSN(BufferGetPage(cbuf), recptr); } PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); } END_CRIT_SECTION(); - /* Write out the updated page and release pin/lock */ + /* release buffers */ if (BufferIsValid(metabuf)) - _bt_wrtbuf(rel, metabuf); - - _bt_wrtbuf(rel, buf); + _bt_relbuf(rel, metabuf); + if (BufferIsValid(cbuf)) + _bt_relbuf(rel, cbuf); + _bt_relbuf(rel, buf); } } /* * _bt_split() -- split a page in the btree. * - * On entry, buf is the page to split, and is write-locked and pinned. + * On entry, buf is the page to split, and is pinned and write-locked. * firstright is the item index of the first item to be moved to the * new right page. newitemoff etc. tell us about the new item that * must be inserted along with the data from the old page. * + * When splitting a non-leaf page, 'cbuf' is the left-sibling of the + * page we're inserting the downlink for. This function will clear the + * INCOMPLETE_SPLIT flag on it, and release the buffer. + * * Returns the new right sibling of buf, pinned and write-locked. - * The pin and lock on buf are maintained. *itup_off and *itup_blkno - * are set to the exact location where newitem was inserted. + * The pin and lock on buf are maintained. */ static Buffer -_bt_split(Relation rel, Buffer buf, OffsetNumber firstright, - OffsetNumber newitemoff, Size newitemsz, BTItem newitem, - bool newitemonleft, - OffsetNumber *itup_off, BlockNumber *itup_blkno) +_bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, + OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem, + bool newitemonleft) { Buffer rbuf; Page origpage; Page leftpage, rightpage; + BlockNumber origpagenumber, + rightpagenumber; BTPageOpaque ropaque, lopaque, oopaque; @@ -657,41 +948,75 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, BTPageOpaque sopaque = NULL; Size itemsz; ItemId itemid; - BTItem item; + IndexTuple item; OffsetNumber leftoff, rightoff; OffsetNumber maxoff; OffsetNumber i; + bool isroot; + bool isleaf; + /* Acquire a new page to split into */ rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + + /* + * origpage is the original page to be split. leftpage is a temporary + * buffer that receives the left-sibling data, which will be copied back + * into origpage on success. rightpage is the new page that receives the + * right-sibling data. If we fail before reaching the critical section, + * origpage hasn't been modified and leftpage is only workspace. In + * principle we shouldn't need to worry about rightpage either, because it + * hasn't been linked into the btree page structure; but to avoid leaving + * possibly-confusing junk behind, we are careful to rewrite rightpage as + * zeroes before throwing any error. + */ origpage = BufferGetPage(buf); - leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData)); + leftpage = PageGetTempPage(origpage); rightpage = BufferGetPage(rbuf); + origpagenumber = BufferGetBlockNumber(buf); + rightpagenumber = BufferGetBlockNumber(rbuf); + _bt_pageinit(leftpage, BufferGetPageSize(buf)); - _bt_pageinit(rightpage, BufferGetPageSize(rbuf)); + /* rightpage was already initialized by _bt_getbuf */ + + /* + * Copy the original page's LSN into leftpage, which will become the + * updated version of the page. We need this because XLogInsert will + * examine the LSN and possibly dump it in a page image. + */ + PageSetLSN(leftpage, PageGetLSN(origpage)); /* init btree private data */ oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage); lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage); ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); + isroot = P_ISROOT(oopaque); + isleaf = P_ISLEAF(oopaque); + /* if we're splitting this page, it won't be the root when we're done */ + /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */ lopaque->btpo_flags = oopaque->btpo_flags; - lopaque->btpo_flags &= ~BTP_ROOT; + lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE); ropaque->btpo_flags = lopaque->btpo_flags; + /* set flag in left page indicating that the right page has no downlink */ + lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT; lopaque->btpo_prev = oopaque->btpo_prev; - lopaque->btpo_next = BufferGetBlockNumber(rbuf); - ropaque->btpo_prev = BufferGetBlockNumber(buf); + lopaque->btpo_next = rightpagenumber; + ropaque->btpo_prev = origpagenumber; ropaque->btpo_next = oopaque->btpo_next; lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level; + /* Since we already have write-lock on both pages, ok to read cycleid */ + lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel); + ropaque->btpo_cycleid = lopaque->btpo_cycleid; /* - * If the page we're splitting is not the rightmost page at its level - * in the tree, then the first entry on the page is the high key for - * the page. We need to copy that to the right half. Otherwise - * (meaning the rightmost page case), all the items on the right half - * will be user data. + * If the page we're splitting is not the rightmost page at its level in + * the tree, then the first entry on the page is the high key for the + * page. We need to copy that to the right half. Otherwise (meaning the + * rightmost page case), all the items on the right half will be user + * data. */ rightoff = P_HIKEY; @@ -699,17 +1024,22 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, { itemid = PageGetItemId(origpage, P_HIKEY); itemsz = ItemIdGetLength(itemid); - item = (BTItem) PageGetItem(origpage, itemid); + item = (IndexTuple) PageGetItem(origpage, itemid); if (PageAddItem(rightpage, (Item) item, itemsz, rightoff, - LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add hikey to the right sibling"); + false, false) == InvalidOffsetNumber) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add hikey to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } /* - * The "high key" for the new left page will be the first key that's - * going to go into the new right page. This might be either the - * existing data item at position firstright, or the incoming tuple. + * The "high key" for the new left page will be the first key that's going + * to go into the new right page. This might be either the existing data + * item at position firstright, or the incoming tuple. */ leftoff = P_HIKEY; if (!newitemonleft && newitemoff == firstright) @@ -723,15 +1053,23 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, /* existing item at firstright will become first on right page */ itemid = PageGetItemId(origpage, firstright); itemsz = ItemIdGetLength(itemid); - item = (BTItem) PageGetItem(origpage, itemid); + item = (IndexTuple) PageGetItem(origpage, itemid); } if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, - LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add hikey to the left sibling"); + false, false) == InvalidOffsetNumber) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add hikey to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); /* - * Now transfer all the data items to the appropriate page + * Now transfer all the data items to the appropriate page. + * + * Note: we *must* insert at least the right page's items in item-number + * order, for the benefit of _bt_restore_page(). */ maxoff = PageGetMaxOffsetNumber(origpage); @@ -739,25 +1077,31 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, { itemid = PageGetItemId(origpage, i); itemsz = ItemIdGetLength(itemid); - item = (BTItem) PageGetItem(origpage, itemid); + item = (IndexTuple) PageGetItem(origpage, itemid); /* does new item belong before this one? */ if (i == newitemoff) { if (newitemonleft) { - _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff, - "left sibling"); - *itup_off = leftoff; - *itup_blkno = BufferGetBlockNumber(buf); + if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, - "right sibling"); - *itup_off = rightoff; - *itup_blkno = BufferGetBlockNumber(rbuf); + if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } } @@ -765,14 +1109,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, /* decide which page to put it on */ if (i < firstright) { - _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add old item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add old item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } } @@ -780,139 +1134,203 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, /* cope with possibility that newitem goes at the end */ if (i <= newitemoff) { - if (newitemonleft) + /* + * Can't have newitemonleft here; that would imply we were told to put + * *everything* on the left page, which cannot fit (if it could, we'd + * not be splitting the page). + */ + Assert(!newitemonleft); + if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff)) { - _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff, - "left sibling"); - *itup_off = leftoff; - *itup_blkno = BufferGetBlockNumber(buf); - leftoff = OffsetNumberNext(leftoff); - } - else - { - _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, - "right sibling"); - *itup_off = rightoff; - *itup_blkno = BufferGetBlockNumber(rbuf); - rightoff = OffsetNumberNext(rightoff); + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); } + rightoff = OffsetNumberNext(rightoff); } /* * We have to grab the right sibling (if any) and fix the prev pointer * there. We are guaranteed that this is deadlock-free since no other - * writer will be holding a lock on that page and trying to move left, - * and all readers release locks on a page before trying to fetch its + * writer will be holding a lock on that page and trying to move left, and + * all readers release locks on a page before trying to fetch its * neighbors. */ - if (!P_RIGHTMOST(ropaque)) + if (!P_RIGHTMOST(oopaque)) { - sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); + sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE); spage = BufferGetPage(sbuf); sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); - if (sopaque->btpo_prev != ropaque->btpo_prev) - elog(PANIC, "right sibling's left-link doesn't match"); + if (sopaque->btpo_prev != origpagenumber) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "right sibling's left-link doesn't match: " + "block %u links to %u instead of expected %u in index \"%s\"", + oopaque->btpo_next, sopaque->btpo_prev, origpagenumber, + RelationGetRelationName(rel)); + } + + /* + * Check to see if we can set the SPLIT_END flag in the right-hand + * split page; this can save some I/O for vacuum since it need not + * proceed to the right sibling. We can set the flag if the right + * sibling has a different cycleid: that means it could not be part of + * a group of pages that were all split off from the same ancestor + * page. If you're confused, imagine that page A splits to A B and + * then again, yielding A C B, while vacuum is in progress. Tuples + * originally in A could now be in either B or C, hence vacuum must + * examine both pages. But if D, our right sibling, has a different + * cycleid then it could not contain any tuples that were in A when + * the vacuum started. + */ + if (sopaque->btpo_cycleid != ropaque->btpo_cycleid) + ropaque->btpo_flags |= BTP_SPLIT_END; } /* - * Right sibling is locked, new siblings are prepared, but original - * page is not updated yet. Log changes before continuing. + * Right sibling is locked, new siblings are prepared, but original page + * is not updated yet. * - * NO EREPORT(ERROR) till right sibling is updated. + * NO EREPORT(ERROR) till right sibling is updated. We can get away with + * not starting the critical section till here because we haven't been + * scribbling on the original page yet; see comments above. */ START_CRIT_SECTION(); + /* + * By here, the original data page has been split into two new halves, and + * these are correct. The algorithm requires that the left page never + * move during a split, so we copy the new left page back on top of the + * original. Note that this is not a waste of time, since we also require + * (in the page management code) that the center of a page always be + * clean, and the most efficient way to guarantee this is just to compact + * the data by reinserting it into a new left page. (XXX the latter + * comment is probably obsolete; but in any case it's good to not scribble + * on the original page until we enter the critical section.) + * + * We need to do this before writing the WAL record, so that XLogInsert + * can WAL log an image of the page if necessary. + */ + PageRestoreTempPage(leftpage, origpage); + /* leftpage, lopaque must not be used below here */ + + MarkBufferDirty(buf); + MarkBufferDirty(rbuf); + if (!P_RIGHTMOST(ropaque)) - sopaque->btpo_prev = BufferGetBlockNumber(rbuf); + { + sopaque->btpo_prev = rightpagenumber; + MarkBufferDirty(sbuf); + } + + /* + * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes + * a split. + */ + if (!isleaf) + { + Page cpage = BufferGetPage(cbuf); + BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage); + + cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT; + MarkBufferDirty(cbuf); + } /* XLOG stuff */ - if (!rel->rd_istemp) + if (RelationNeedsWAL(rel)) { xl_btree_split xlrec; uint8 xlinfo; XLogRecPtr recptr; - XLogRecData rdata[4]; - xlrec.target.node = rel->rd_node; - ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off); - if (newitemonleft) - xlrec.otherblk = BufferGetBlockNumber(rbuf); - else - xlrec.otherblk = BufferGetBlockNumber(buf); - xlrec.leftblk = lopaque->btpo_prev; - xlrec.rightblk = ropaque->btpo_next; - xlrec.level = lopaque->btpo.level; - - /* - * Direct access to page is not good but faster - we should - * implement some new func in page API. Note we only store the - * tuples themselves, knowing that the item pointers are in the - * same order and can be reconstructed by scanning the tuples. - */ - xlrec.leftlen = ((PageHeader) leftpage)->pd_special - - ((PageHeader) leftpage)->pd_upper; + xlrec.level = ropaque->btpo.level; + xlrec.firstright = firstright; + xlrec.newitemoff = newitemoff; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeSplit; - rdata[0].next = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit); - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper; - rdata[1].len = xlrec.leftlen; - rdata[1].next = &(rdata[2]); + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); + XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT); + /* Log the right sibling, because we've changed its prev-pointer. */ + if (!P_RIGHTMOST(ropaque)) + XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD); + if (BufferIsValid(cbuf)) + XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD); - rdata[2].buffer = InvalidBuffer; - rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper; - rdata[2].len = ((PageHeader) rightpage)->pd_special - - ((PageHeader) rightpage)->pd_upper; - rdata[2].next = NULL; + /* + * Log the new item, if it was inserted on the left page. (If it was + * put on the right page, we don't need to explicitly WAL log it + * because it's included with all the other items on the right page.) + * Show the new item as belonging to the left page buffer, so that it + * is not stored if XLogInsert decides it needs a full-page image of + * the left page. We store the offset anyway, though, to support + * archive compression of these records. + */ + if (newitemonleft) + XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz)); - if (!P_RIGHTMOST(ropaque)) + /* Log left page */ + if (!isleaf) { - rdata[2].next = &(rdata[3]); - rdata[3].buffer = sbuf; - rdata[3].data = NULL; - rdata[3].len = 0; - rdata[3].next = NULL; + /* + * We must also log the left page's high key, because the right + * page's leftmost key is suppressed on non-leaf levels. Show it + * as belonging to the left page buffer, so that it is not stored + * if XLogInsert decides it needs a full-page image of the left + * page. + */ + itemid = PageGetItemId(origpage, P_HIKEY); + item = (IndexTuple) PageGetItem(origpage, itemid); + XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item))); } - if (P_ISROOT(oopaque)) + /* + * Log the contents of the right page in the format understood by + * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer, + * because we're going to recreate the whole page anyway, so it should + * never be stored by XLogInsert. + * + * Direct access to page is not good but faster - we should implement + * some new func in page API. Note we only store the tuples + * themselves, knowing that they were inserted in item-number order + * and so the item pointers can be reconstructed. See comments for + * _bt_restore_page(). + */ + XLogRegisterBufData(1, + (char *) rightpage + ((PageHeader) rightpage)->pd_upper, + ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper); + + if (isroot) xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT; else xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R; - recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata); + recptr = XLogInsert(RM_BTREE_ID, xlinfo); - PageSetLSN(leftpage, recptr); - PageSetTLI(leftpage, ThisTimeLineID); + PageSetLSN(origpage, recptr); PageSetLSN(rightpage, recptr); - PageSetTLI(rightpage, ThisTimeLineID); if (!P_RIGHTMOST(ropaque)) { PageSetLSN(spage, recptr); - PageSetTLI(spage, ThisTimeLineID); + } + if (!isleaf) + { + PageSetLSN(BufferGetPage(cbuf), recptr); } } - /* - * By here, the original data page has been split into two new halves, - * and these are correct. The algorithm requires that the left page - * never move during a split, so we copy the new left page back on top - * of the original. Note that this is not a waste of time, since we - * also require (in the page management code) that the center of a - * page always be clean, and the most efficient way to guarantee this - * is just to compact the data by reinserting it into a new left page. - */ - - PageRestoreTempPage(leftpage, origpage); - END_CRIT_SECTION(); - /* write and release the old right sibling */ + /* release the old right sibling */ if (!P_RIGHTMOST(ropaque)) - _bt_wrtbuf(rel, sbuf); + _bt_relbuf(rel, sbuf); + + /* release the child */ + if (!isleaf) + _bt_relbuf(rel, cbuf); /* split's done */ return rbuf; @@ -927,14 +1345,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * it needs to go into!) * * If the page is the rightmost page on its level, we instead try to arrange - * for twice as much free space on the right as on the left. In this way, - * when we are inserting successively increasing keys (consider sequences, - * timestamps, etc) we will end up with a tree whose pages are about 67% full, + * to leave the left split page fillfactor% full. In this way, when we are + * inserting successively increasing keys (consider sequences, timestamps, + * etc) we will end up with a tree whose pages are about fillfactor% full, * instead of the 50% full result that we'd get without this special case. - * (We could bias it even further to make the initially-loaded tree more full. - * But since the steady-state load for a btree is about 70%, we'd likely just - * be making more page-splitting work for ourselves later on, when we start - * seeing updates to existing tuples.) + * This is the same as nbtsort.c produces for a newly-created tree. Note + * that leaf and nonleaf pages use different fillfactors. * * We are passed the intended insert position of the new tuple, expressed as * the offsetnumber of the tuple it must go in front of. (This could be @@ -942,7 +1358,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * * We return the index of the first existing tuple that should go on the * righthand page, plus a boolean indicating whether the new tuple goes on - * the left or right page. The bool is necessary to disambiguate the case + * the left or right page. The bool is necessary to disambiguate the case * where firstright == newitemoff. */ static OffsetNumber @@ -960,34 +1376,20 @@ _bt_findsplitloc(Relation rel, int leftspace, rightspace, goodenough, - dataitemtotal, - dataitemstoleft; + olddataitemstotal, + olddataitemstoleft; + bool goodenoughfound; opaque = (BTPageOpaque) PageGetSpecialPointer(page); /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */ newitemsz += sizeof(ItemIdData); - state.newitemsz = newitemsz; - state.is_leaf = P_ISLEAF(opaque); - state.is_rightmost = P_RIGHTMOST(opaque); - state.have_split = false; /* Total free space available on a btree page, after fixed overhead */ leftspace = rightspace = PageGetPageSize(page) - SizeOfPageHeaderData - MAXALIGN(sizeof(BTPageOpaqueData)); - /* - * Finding the best possible split would require checking all the - * possible split points, because of the high-key and left-key special - * cases. That's probably more work than it's worth; instead, stop as - * soon as we find a "good-enough" split, where good-enough is defined - * as an imbalance in free space of no more than pagesize/16 - * (arbitrary...) This should let us stop near the middle on most - * pages, instead of plowing to the end. - */ - goodenough = leftspace / 16; - /* The right page will have the same high key as the old page */ if (!P_RIGHTMOST(opaque)) { @@ -997,13 +1399,42 @@ _bt_findsplitloc(Relation rel, } /* Count up total space in data items without actually scanning 'em */ - dataitemtotal = rightspace - (int) PageGetFreeSpace(page); + olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page); + + state.newitemsz = newitemsz; + state.is_leaf = P_ISLEAF(opaque); + state.is_rightmost = P_RIGHTMOST(opaque); + state.have_split = false; + if (state.is_leaf) + state.fillfactor = RelationGetFillFactor(rel, + BTREE_DEFAULT_FILLFACTOR); + else + state.fillfactor = BTREE_NONLEAF_FILLFACTOR; + state.newitemonleft = false; /* these just to keep compiler quiet */ + state.firstright = 0; + state.best_delta = 0; + state.leftspace = leftspace; + state.rightspace = rightspace; + state.olddataitemstotal = olddataitemstotal; + state.newitemoff = newitemoff; + + /* + * Finding the best possible split would require checking all the possible + * split points, because of the high-key and left-key special cases. + * That's probably more work than it's worth; instead, stop as soon as we + * find a "good-enough" split, where good-enough is defined as an + * imbalance in free space of no more than pagesize/16 (arbitrary...) This + * should let us stop near the middle on most pages, instead of plowing to + * the end. + */ + goodenough = leftspace / 16; /* - * Scan through the data items and calculate space usage for a split - * at each possible position. + * Scan through the data items and calculate space usage for a split at + * each possible position. */ - dataitemstoleft = 0; + olddataitemstoleft = 0; + goodenoughfound = false; maxoff = PageGetMaxOffsetNumber(page); for (offnum = P_FIRSTDATAKEY(opaque); @@ -1011,52 +1442,54 @@ _bt_findsplitloc(Relation rel, offnum = OffsetNumberNext(offnum)) { Size itemsz; - int leftfree, - rightfree; itemid = PageGetItemId(page, offnum); itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData); - /* - * We have to allow for the current item becoming the high key of - * the left page; therefore it counts against left space as well - * as right space. - */ - leftfree = leftspace - dataitemstoleft - (int) itemsz; - rightfree = rightspace - (dataitemtotal - dataitemstoleft); - /* * Will the new item go to left or right of split? */ if (offnum > newitemoff) - _bt_checksplitloc(&state, offnum, leftfree, rightfree, - true, itemsz); + _bt_checksplitloc(&state, offnum, true, + olddataitemstoleft, itemsz); + else if (offnum < newitemoff) - _bt_checksplitloc(&state, offnum, leftfree, rightfree, - false, itemsz); + _bt_checksplitloc(&state, offnum, false, + olddataitemstoleft, itemsz); else { /* need to try it both ways! */ - _bt_checksplitloc(&state, offnum, leftfree, rightfree, - true, itemsz); - /* here we are contemplating newitem as first on right */ - _bt_checksplitloc(&state, offnum, leftfree, rightfree, - false, newitemsz); + _bt_checksplitloc(&state, offnum, true, + olddataitemstoleft, itemsz); + + _bt_checksplitloc(&state, offnum, false, + olddataitemstoleft, itemsz); } /* Abort scan once we find a good-enough choice */ if (state.have_split && state.best_delta <= goodenough) + { + goodenoughfound = true; break; + } - dataitemstoleft += itemsz; + olddataitemstoleft += itemsz; } /* - * I believe it is not possible to fail to find a feasible split, but - * just in case ... + * If the new item goes as the last item, check for splitting so that all + * the old items go to the left page and the new item goes to the right + * page. + */ + if (newitemoff > maxoff && !goodenoughfound) + _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0); + + /* + * I believe it is not possible to fail to find a feasible split, but just + * in case ... */ if (!state.have_split) - elog(ERROR, "could not find a feasible split point for \"%s\"", + elog(ERROR, "could not find a feasible split point for index \"%s\"", RelationGetRelationName(rel)); *newitemonleft = state.newitemonleft; @@ -1066,15 +1499,49 @@ _bt_findsplitloc(Relation rel, /* * Subroutine to analyze a particular possible split choice (ie, firstright * and newitemonleft settings), and record the best split so far in *state. + * + * firstoldonright is the offset of the first item on the original page + * that goes to the right page, and firstoldonrightsz is the size of that + * tuple. firstoldonright can be > max offset, which means that all the old + * items go to the left page and only the new item goes to the right page. + * In that case, firstoldonrightsz is not used. + * + * olddataitemstoleft is the total size of all old items to the left of + * firstoldonright. */ static void -_bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, - int leftfree, int rightfree, - bool newitemonleft, Size firstrightitemsz) +_bt_checksplitloc(FindSplitData *state, + OffsetNumber firstoldonright, + bool newitemonleft, + int olddataitemstoleft, + Size firstoldonrightsz) { + int leftfree, + rightfree; + Size firstrightitemsz; + bool newitemisfirstonright; + + /* Is the new item going to be the first item on the right page? */ + newitemisfirstonright = (firstoldonright == state->newitemoff + && !newitemonleft); + + if (newitemisfirstonright) + firstrightitemsz = state->newitemsz; + else + firstrightitemsz = firstoldonrightsz; + + /* Account for all the old tuples */ + leftfree = state->leftspace - olddataitemstoleft; + rightfree = state->rightspace - + (state->olddataitemstotal - olddataitemstoleft); + /* - * Account for the new item on whichever side it is to be put. + * The first item on the right page becomes the high key of the left page; + * therefore it counts against left space as well as right space. */ + leftfree -= firstrightitemsz; + + /* account for the new item */ if (newitemonleft) leftfree -= (int) state->newitemsz; else @@ -1086,7 +1553,7 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, */ if (!state->is_leaf) rightfree += (int) firstrightitemsz - - (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData)); + (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData)); /* * If feasible split point, remember best delta. @@ -1098,11 +1565,11 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, if (state->is_rightmost) { /* - * On a rightmost page, try to equalize right free space with - * twice the left free space. See comments for - * _bt_findsplitloc. + * If splitting a rightmost page, try to put (100-fillfactor)% of + * free space on left page. See comments for _bt_findsplitloc. */ - delta = (2 * leftfree) - rightfree; + delta = (state->fillfactor * leftfree) + - ((100 - state->fillfactor) * rightfree); } else { @@ -1116,7 +1583,7 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, { state->have_split = true; state->newitemonleft = newitemonleft; - state->firstright = firstright; + state->firstright = firstoldonright; state->best_delta = delta; } } @@ -1127,7 +1594,7 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, * * On entry, buf and rbuf are the left and right split pages, which we * still hold write locks on per the L&Y algorithm. We release the - * write locks once we have write lock on the parent page. (Any sooner, + * write locks once we have write lock on the parent page. (Any sooner, * and it'd be possible for some other process to try to split or delete * one of these pages, and get confused because it cannot find the downlink.) * @@ -1135,10 +1602,8 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, * have to be efficient (concurrent ROOT split, WAL recovery) * is_root - we split the true root * is_only - we split a page alone on its level (might have been fast root) - * - * This is exported so it can be called by nbtxlog.c. */ -void +static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf, @@ -1147,19 +1612,17 @@ _bt_insert_parent(Relation rel, bool is_only) { /* - * Here we have to do something Lehman and Yao don't talk about: deal - * with a root split and construction of a new root. If our stack is - * empty then we have just split a node on what had been the root - * level when we descended the tree. If it was still the root then we - * perform a new-root construction. If it *wasn't* the root anymore, - * search to find the next higher level that someone constructed - * meanwhile, and find the right place to insert as for the normal - * case. + * Here we have to do something Lehman and Yao don't talk about: deal with + * a root split and construction of a new root. If our stack is empty + * then we have just split a node on what had been the root level when we + * descended the tree. If it was still the root then we perform a + * new-root construction. If it *wasn't* the root anymore, search to find + * the next higher level that someone constructed meanwhile, and find the + * right place to insert as for the normal case. * * If we have to search for the parent level, we do so by re-descending - * from the root. This is not super-efficient, but it's rare enough - * not to matter. (This path is also taken when called from WAL - * recovery --- we have no stack in that case.) + * from the root. This is not super-efficient, but it's rare enough not + * to matter. */ if (is_root) { @@ -1170,26 +1633,25 @@ _bt_insert_parent(Relation rel, /* create a new root node and update the metapage */ rootbuf = _bt_newroot(rel, buf, rbuf); /* release the split buffers */ - _bt_wrtbuf(rel, rootbuf); - _bt_wrtbuf(rel, rbuf); - _bt_wrtbuf(rel, buf); + _bt_relbuf(rel, rootbuf); + _bt_relbuf(rel, rbuf); + _bt_relbuf(rel, buf); } else { BlockNumber bknum = BufferGetBlockNumber(buf); BlockNumber rbknum = BufferGetBlockNumber(rbuf); Page page = BufferGetPage(buf); - BTItem new_item; + IndexTuple new_item; BTStackData fakestack; - BTItem ritem; + IndexTuple ritem; Buffer pbuf; if (stack == NULL) { BTPageOpaque lpageop; - if (!InRecovery) - elog(DEBUG2, "concurrent ROOT page split"); + elog(DEBUG2, "concurrent ROOT page split"); lpageop = (BTPageOpaque) PageGetSpecialPointer(page); /* Find the leftmost page at the next level up */ pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false); @@ -1197,43 +1659,43 @@ _bt_insert_parent(Relation rel, stack = &fakestack; stack->bts_blkno = BufferGetBlockNumber(pbuf); stack->bts_offset = InvalidOffsetNumber; - /* bts_btitem will be initialized below */ + /* bts_btentry will be initialized below */ stack->bts_parent = NULL; _bt_relbuf(rel, pbuf); } /* get high key from left page == lowest key on new right page */ - ritem = (BTItem) PageGetItem(page, - PageGetItemId(page, P_HIKEY)); + ritem = (IndexTuple) PageGetItem(page, + PageGetItemId(page, P_HIKEY)); /* form an index tuple that points at the new right page */ - new_item = _bt_formitem(&(ritem->bti_itup)); - ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY); + new_item = CopyIndexTuple(ritem); + ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY); /* * Find the parent buffer and get the parent page. * - * Oops - if we were moved right then we need to change stack item! - * We want to find parent pointing to where we are, right ? - - * vadim 05/27/97 + * Oops - if we were moved right then we need to change stack item! We + * want to find parent pointing to where we are, right ? - vadim + * 05/27/97 */ - ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid), - bknum, P_HIKEY); - + ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY); pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); - /* Now we can write and unlock the children */ - _bt_wrtbuf(rel, rbuf); - _bt_wrtbuf(rel, buf); + /* + * Now we can unlock the right child. The left child will be unlocked + * by _bt_insertonpg(). + */ + _bt_relbuf(rel, rbuf); /* Check for error only after writing children */ if (pbuf == InvalidBuffer) - elog(ERROR, "failed to re-find parent key in \"%s\"", - RelationGetRelationName(rel)); + elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u", + RelationGetRelationName(rel), bknum, rbknum); /* Recursively update the parent */ - _bt_insertonpg(rel, pbuf, stack->bts_parent, - 0, NULL, new_item, stack->bts_offset, + _bt_insertonpg(rel, pbuf, buf, stack->bts_parent, + new_item, stack->bts_offset + 1, is_only); /* be tidy */ @@ -1241,6 +1703,62 @@ _bt_insert_parent(Relation rel, } } +/* + * _bt_finish_split() -- Finish an incomplete split + * + * A crash or other failure can leave a split incomplete. The insertion + * routines won't allow to insert on a page that is incompletely split. + * Before inserting on such a page, call _bt_finish_split(). + * + * On entry, 'lbuf' must be locked in write-mode. On exit, it is unlocked + * and unpinned. + */ +void +_bt_finish_split(Relation rel, Buffer lbuf, BTStack stack) +{ + Page lpage = BufferGetPage(lbuf); + BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage); + Buffer rbuf; + Page rpage; + BTPageOpaque rpageop; + bool was_root; + bool was_only; + + Assert(P_INCOMPLETE_SPLIT(lpageop)); + + /* Lock right sibling, the one missing the downlink */ + rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE); + rpage = BufferGetPage(rbuf); + rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); + + /* Could this be a root split? */ + if (!stack) + { + Buffer metabuf; + Page metapg; + BTMetaPageData *metad; + + /* acquire lock on the metapage */ + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); + metapg = BufferGetPage(metabuf); + metad = BTPageGetMeta(metapg); + + was_root = (metad->btm_root == BufferGetBlockNumber(lbuf)); + + _bt_relbuf(rel, metabuf); + } + else + was_root = false; + + /* Was this the only page on the level before split? */ + was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop)); + + elog(DEBUG1, "finishing incomplete split of %u/%u", + BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf)); + + _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only); +} + /* * _bt_getstackbuf() -- Walk back up the tree one step, and find the item * we last looked at in the parent. @@ -1273,21 +1791,27 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access) page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); + if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque)) + { + _bt_finish_split(rel, buf, stack->bts_parent); + continue; + } + if (!P_IGNORE(opaque)) { OffsetNumber offnum, minoff, maxoff; ItemId itemid; - BTItem item; + IndexTuple item; minoff = P_FIRSTDATAKEY(opaque); maxoff = PageGetMaxOffsetNumber(page); /* - * start = InvalidOffsetNumber means "search the whole page". - * We need this test anyway due to possibility that page has a - * high key now when it didn't before. + * start = InvalidOffsetNumber means "search the whole page". We + * need this test anyway due to possibility that page has a high + * key now when it didn't before. */ if (start < minoff) start = minoff; @@ -1301,16 +1825,16 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access) /* * These loops will check every item on the page --- but in an - * order that's attuned to the probability of where it - * actually is. Scan to the right first, then to the left. + * order that's attuned to the probability of where it actually + * is. Scan to the right first, then to the left. */ for (offnum = start; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { itemid = PageGetItemId(page, offnum); - item = (BTItem) PageGetItem(page, itemid); - if (BTItemSame(item, &stack->bts_btitem)) + item = (IndexTuple) PageGetItem(page, itemid); + if (BTEntrySame(item, &stack->bts_btentry)) { /* Return accurate pointer to where link is now */ stack->bts_blkno = blkno; @@ -1324,8 +1848,8 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access) offnum = OffsetNumberPrev(offnum)) { itemid = PageGetItemId(page, offnum); - item = (BTItem) PageGetItem(page, itemid); - if (BTItemSame(item, &stack->bts_btitem)) + item = (IndexTuple) PageGetItem(page, itemid); + if (BTEntrySame(item, &stack->bts_btentry)) { /* Return accurate pointer to where link is now */ stack->bts_blkno = blkno; @@ -1372,16 +1896,18 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) { Buffer rootbuf; Page lpage, - rpage, rootpage; BlockNumber lbkno, rbkno; BlockNumber rootblknum; BTPageOpaque rootopaque; + BTPageOpaque lopaque; ItemId itemid; - BTItem item; - Size itemsz; - BTItem new_item; + IndexTuple item; + IndexTuple left_item; + Size left_item_sz; + IndexTuple right_item; + Size right_item_sz; Buffer metabuf; Page metapg; BTMetaPageData *metad; @@ -1389,7 +1915,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) lbkno = BufferGetBlockNumber(lbuf); rbkno = BufferGetBlockNumber(rbuf); lpage = BufferGetPage(lbuf); - rpage = BufferGetPage(rbuf); + lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage); /* get a new root page */ rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); @@ -1401,6 +1927,26 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) metapg = BufferGetPage(metabuf); metad = BTPageGetMeta(metapg); + /* + * Create downlink item for left page (old root). Since this will be the + * first item in a non-leaf page, it implicitly has minus-infinity key + * value, so we need not store any actual key in it. + */ + left_item_sz = sizeof(IndexTupleData); + left_item = (IndexTuple) palloc(left_item_sz); + left_item->t_info = left_item_sz; + ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY); + + /* + * Create downlink item for right page. The key for it is obtained from + * the "high key" position in the left page. + */ + itemid = PageGetItemId(lpage, P_HIKEY); + right_item_sz = ItemIdGetLength(itemid); + item = (IndexTuple) PageGetItem(lpage, itemid); + right_item = CopyIndexTuple(item); + ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY); + /* NO EREPORT(ERROR) from here till newroot op is logged */ START_CRIT_SECTION(); @@ -1410,6 +1956,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) rootopaque->btpo_flags = BTP_ROOT; rootopaque->btpo.level = ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1; + rootopaque->btpo_cycleid = 0; /* update metapage data */ metad->btm_root = rootblknum; @@ -1418,85 +1965,85 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) metad->btm_fastlevel = rootopaque->btpo.level; /* - * Create downlink item for left page (old root). Since this will be - * the first item in a non-leaf page, it implicitly has minus-infinity - * key value, so we need not store any actual key in it. + * Insert the left page pointer into the new root page. The root page is + * the rightmost page on its level so there is no "high key" in it; the + * two items will go into positions P_HIKEY and P_FIRSTKEY. + * + * Note: we *must* insert the two items in item-number order, for the + * benefit of _bt_restore_page(). */ - itemsz = sizeof(BTItemData); - new_item = (BTItem) palloc(itemsz); - new_item->bti_itup.t_info = itemsz; - ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY); + if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY, + false, false) == InvalidOffsetNumber) + elog(PANIC, "failed to add leftkey to new root page" + " while splitting block %u of index \"%s\"", + BufferGetBlockNumber(lbuf), RelationGetRelationName(rel)); /* - * Insert the left page pointer into the new root page. The root page - * is the rightmost page on its level so there is no "high key" in it; - * the two items will go into positions P_HIKEY and P_FIRSTKEY. + * insert the right page pointer into the new root page. */ - if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add leftkey to new root page"); - pfree(new_item); + if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY, + false, false) == InvalidOffsetNumber) + elog(PANIC, "failed to add rightkey to new root page" + " while splitting block %u of index \"%s\"", + BufferGetBlockNumber(lbuf), RelationGetRelationName(rel)); - /* - * Create downlink item for right page. The key for it is obtained - * from the "high key" position in the left page. - */ - itemid = PageGetItemId(lpage, P_HIKEY); - itemsz = ItemIdGetLength(itemid); - item = (BTItem) PageGetItem(lpage, itemid); - new_item = _bt_formitem(&(item->bti_itup)); - ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY); + /* Clear the incomplete-split flag in the left child */ + Assert(P_INCOMPLETE_SPLIT(lopaque)); + lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT; + MarkBufferDirty(lbuf); - /* - * insert the right page pointer into the new root page. - */ - if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add rightkey to new root page"); - pfree(new_item); + MarkBufferDirty(rootbuf); + MarkBufferDirty(metabuf); /* XLOG stuff */ - if (!rel->rd_istemp) + if (RelationNeedsWAL(rel)) { xl_btree_newroot xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; + xl_btree_metadata md; - xlrec.node = rel->rd_node; xlrec.rootblk = rootblknum; xlrec.level = metad->btm_level; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeNewroot; - rdata[0].next = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot); + + XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT); + XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD); + XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT); + + md.root = rootblknum; + md.level = metad->btm_level; + md.fastroot = rootblknum; + md.fastlevel = metad->btm_level; + + XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata)); /* - * Direct access to page is not good but faster - we should - * implement some new func in page API. + * Direct access to page is not good but faster - we should implement + * some new func in page API. */ - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper; - rdata[1].len = ((PageHeader) rootpage)->pd_special - - ((PageHeader) rootpage)->pd_upper; - rdata[1].next = NULL; + XLogRegisterBufData(0, + (char *) rootpage + ((PageHeader) rootpage)->pd_upper, + ((PageHeader) rootpage)->pd_special - + ((PageHeader) rootpage)->pd_upper); - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata); + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT); + PageSetLSN(lpage, recptr); PageSetLSN(rootpage, recptr); - PageSetTLI(rootpage, ThisTimeLineID); PageSetLSN(metapg, recptr); - PageSetTLI(metapg, ThisTimeLineID); - PageSetLSN(lpage, recptr); - PageSetTLI(lpage, ThisTimeLineID); - PageSetLSN(rpage, recptr); - PageSetTLI(rpage, ThisTimeLineID); } END_CRIT_SECTION(); - /* write and let go of metapage buffer */ - _bt_wrtbuf(rel, metabuf); + /* done with metapage */ + _bt_relbuf(rel, metabuf); - return (rootbuf); + pfree(left_item); + pfree(right_item); + + return rootbuf; } /* @@ -1508,35 +2055,34 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * the buffer afterwards, either. * * The main difference between this routine and a bare PageAddItem call - * is that this code knows that the leftmost data item on a non-leaf + * is that this code knows that the leftmost index tuple on a non-leaf * btree page doesn't need to have a key. Therefore, it strips such - * items down to just the item header. CAUTION: this works ONLY if - * we insert the items in order, so that the given itup_off does - * represent the final position of the item! + * tuples down to just the tuple header. CAUTION: this works ONLY if + * we insert the tuples in order, so that the given itup_off does + * represent the final position of the tuple! */ -static void -_bt_pgaddtup(Relation rel, - Page page, +static bool +_bt_pgaddtup(Page page, Size itemsize, - BTItem btitem, - OffsetNumber itup_off, - const char *where) + IndexTuple itup, + OffsetNumber itup_off) { BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); - BTItemData truncitem; + IndexTupleData trunctuple; if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque)) { - memcpy(&truncitem, btitem, sizeof(BTItemData)); - truncitem.bti_itup.t_info = sizeof(BTItemData); - btitem = &truncitem; - itemsize = sizeof(BTItemData); + trunctuple = *itup; + trunctuple.t_info = sizeof(IndexTupleData); + itup = &trunctuple; + itemsize = sizeof(IndexTupleData); } - if (PageAddItem(page, (Item) btitem, itemsize, itup_off, - LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add item to the %s for \"%s\"", - where, RelationGetRelationName(rel)); + if (PageAddItem(page, (Item) itup, itemsize, itup_off, + false, false) == InvalidOffsetNumber) + return false; + + return true; } /* @@ -1549,15 +2095,13 @@ static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey) { - BTItem btitem; IndexTuple itup; int i; /* Better be comparing to a leaf item */ Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page))); - btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); - itup = &(btitem->bti_itup); + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); for (i = 1; i <= keysz; i++) { @@ -1574,9 +2118,10 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, if (isNull || (scankey->sk_flags & SK_ISNULL)) return false; - result = DatumGetInt32(FunctionCall2(&scankey->sk_func, - datum, - scankey->sk_argument)); + result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func, + scankey->sk_collation, + datum, + scankey->sk_argument)); if (result != 0) return false; @@ -1587,3 +2132,48 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, /* if we get here, the keys are equal */ return true; } + +/* + * _bt_vacuum_one_page - vacuum just one index page. + * + * Try to remove LP_DEAD items from the given page. The passed buffer + * must be exclusive-locked, but unlike a real VACUUM, we don't need a + * super-exclusive "cleanup" lock (see nbtree/README). + */ +static void +_bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel) +{ + OffsetNumber deletable[MaxOffsetNumber]; + int ndeletable = 0; + OffsetNumber offnum, + minoff, + maxoff; + Page page = BufferGetPage(buffer); + BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); + + /* + * Scan over all items to see which ones need to be deleted according to + * LP_DEAD flags. + */ + minoff = P_FIRSTDATAKEY(opaque); + maxoff = PageGetMaxOffsetNumber(page); + for (offnum = minoff; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + ItemId itemId = PageGetItemId(page, offnum); + + if (ItemIdIsDead(itemId)) + deletable[ndeletable++] = offnum; + } + + if (ndeletable > 0) + _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel); + + /* + * Note: if we didn't find any LP_DEAD items, then the page's + * BTP_HAS_GARBAGE hint bit is falsely set. We do not bother expending a + * separate write to clear it, however. We will clear it when we split + * the page. + */ +}