X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Faccess%2Fnbtree%2Fnbtinsert.c;h=ef68a7145fce5f5352196a4b1c751aee26e10859;hb=2ed5b87f96d473962ec5230fd820abfeaccb2069;hp=6842d5e8c9138a65e6512c548f9ab5b2ea502529;hpb=b4eae023bb2b5e9bce96cd367d70c180eefe6bf9;p=postgresql diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 6842d5e8c9..ef68a7145f 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -3,12 +3,12 @@ * nbtinsert.c * Item insertion in Lehman and Yao btrees for Postgres. * - * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.168 2008/11/03 20:47:48 tgl Exp $ + * src/backend/access/nbtree/nbtinsert.c * *------------------------------------------------------------------------- */ @@ -18,10 +18,10 @@ #include "access/heapam.h" #include "access/nbtree.h" #include "access/transam.h" +#include "access/xloginsert.h" #include "miscadmin.h" -#include "storage/bufmgr.h" #include "storage/lmgr.h" -#include "utils/inval.h" +#include "storage/predicate.h" #include "utils/tqual.h" @@ -49,22 +49,27 @@ typedef struct static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static TransactionId _bt_check_unique(Relation rel, IndexTuple itup, - Relation heapRel, Buffer buf, OffsetNumber ioffset, - ScanKey itup_scankey); + Relation heapRel, Buffer buf, OffsetNumber offset, + ScanKey itup_scankey, + IndexUniqueCheck checkUnique, bool *is_unique); static void _bt_findinsertloc(Relation rel, Buffer *bufptr, OffsetNumber *offsetptr, int keysz, ScanKey scankey, - IndexTuple newtup); -static void _bt_insertonpg(Relation rel, Buffer buf, + IndexTuple newtup, + BTStack stack, + Relation heapRel); +static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf, BTStack stack, IndexTuple itup, OffsetNumber newitemoff, bool split_only_page); -static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, - OffsetNumber newitemoff, Size newitemsz, +static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf, + OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem, bool newitemonleft); +static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf, + BTStack stack, bool is_root, bool is_only); static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber newitemoff, Size newitemsz, @@ -72,24 +77,36 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page, static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstoldonright, bool newitemonleft, int dataitemstoleft, Size firstoldonrightsz); -static void _bt_pgaddtup(Relation rel, Page page, - Size itemsize, IndexTuple itup, - OffsetNumber itup_off, const char *where); +static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, + OffsetNumber itup_off); static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); -static void _bt_vacuum_one_page(Relation rel, Buffer buffer); +static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel); /* * _bt_doinsert() -- Handle insertion of a single index tuple in the tree. * - * This routine is called by the public interface routines, btbuild - * and btinsert. By here, itup is filled in, including the TID. + * This routine is called by the public interface routine, btinsert. + * By here, itup is filled in, including the TID. + * + * If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this + * will allow duplicates. Otherwise (UNIQUE_CHECK_YES or + * UNIQUE_CHECK_EXISTING) it will throw error for a duplicate. + * For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and + * don't actually insert. + * + * The result value is only significant for UNIQUE_CHECK_PARTIAL: + * it must be TRUE if the entry is known unique, else FALSE. + * (In the current implementation we'll also return TRUE after a + * successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but + * that's just a coding artifact.) */ -void +bool _bt_doinsert(Relation rel, IndexTuple itup, - bool index_is_unique, Relation heapRel) + IndexUniqueCheck checkUnique, Relation heapRel) { + bool is_unique = false; int natts = rel->rd_rel->relnatts; ScanKey itup_scankey; BTStack stack; @@ -113,10 +130,11 @@ top: * If the page was split between the time that we surrendered our read * lock and acquired our write lock, then this page may no longer be the * right place for the key we want to insert. In this case, we need to - * move right in the tree. See Lehman and Yao for an excruciatingly + * move right in the tree. See Lehman and Yao for an excruciatingly * precise description. */ - buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE); + buf = _bt_moveright(rel, buf, natts, itup_scankey, false, + true, stack, BT_WRITE); /* * If we're not allowing duplicates, make sure the key isn't already in @@ -134,32 +152,56 @@ top: * * If we must wait for another xact, we release the lock while waiting, * and then must start over completely. + * + * For a partial uniqueness check, we don't wait for the other xact. Just + * let the tuple in and return false for possibly non-unique, or true for + * definitely unique. */ - if (index_is_unique) + if (checkUnique != UNIQUE_CHECK_NO) { TransactionId xwait; offset = _bt_binsrch(rel, buf, natts, itup_scankey, false); - xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey); + xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey, + checkUnique, &is_unique); if (TransactionIdIsValid(xwait)) { /* Have to wait for the other guy ... */ _bt_relbuf(rel, buf); - XactLockTableWait(xwait); + XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex); /* start over... */ _bt_freestack(stack); goto top; } } - /* do the insertion */ - _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup); - _bt_insertonpg(rel, buf, stack, itup, offset, false); + if (checkUnique != UNIQUE_CHECK_EXISTING) + { + /* + * The only conflict predicate locking cares about for indexes is when + * an index tuple insert conflicts with an existing lock. Since the + * actual location of the insert is hard to predict because of the + * random search used to prevent O(N^2) performance when there are + * many duplicate entries, we can just use the "first valid" page. + */ + CheckForSerializableConflictIn(rel, NULL, buf); + /* do the insertion */ + _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, + stack, heapRel); + _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false); + } + else + { + /* just release the buffer */ + _bt_relbuf(rel, buf); + } /* be tidy */ _bt_freestack(stack); _bt_freeskey(itup_scankey); + + return is_unique; } /* @@ -170,12 +212,18 @@ top: * is the first tuple on the next page. * * Returns InvalidTransactionId if there is no conflict, else an xact ID - * we must wait for to see if it commits a conflicting tuple. If an actual + * we must wait for to see if it commits a conflicting tuple. If an actual * conflict is detected, no return --- just ereport(). + * + * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return + * InvalidTransactionId because we don't want to wait. In this case we + * set *is_unique to false if there is a potential conflict, and the + * core code must redo the uniqueness check later. */ static TransactionId _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, - Buffer buf, OffsetNumber offset, ScanKey itup_scankey) + Buffer buf, OffsetNumber offset, ScanKey itup_scankey, + IndexUniqueCheck checkUnique, bool *is_unique) { TupleDesc itupdesc = RelationGetDescr(rel); int natts = rel->rd_rel->relnatts; @@ -184,6 +232,10 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, Page page; BTPageOpaque opaque; Buffer nbuf = InvalidBuffer; + bool found = false; + + /* Assume unique until we find a duplicate */ + *is_unique = true; InitDirtySnapshot(SnapshotDirty); @@ -240,22 +292,49 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, curitup = (IndexTuple) PageGetItem(page, curitemid); htid = curitup->t_tid; + /* + * If we are doing a recheck, we expect to find the tuple we + * are rechecking. It's not a duplicate, but we have to keep + * scanning. + */ + if (checkUnique == UNIQUE_CHECK_EXISTING && + ItemPointerCompare(&htid, &itup->t_tid) == 0) + { + found = true; + } + /* * We check the whole HOT-chain to see if there is any tuple * that satisfies SnapshotDirty. This is necessary because we * have just a single index entry for the entire chain. */ - if (heap_hot_search(&htid, heapRel, &SnapshotDirty, &all_dead)) + else if (heap_hot_search(&htid, heapRel, &SnapshotDirty, + &all_dead)) { - /* it is a duplicate */ - TransactionId xwait = - (TransactionIdIsValid(SnapshotDirty.xmin)) ? - SnapshotDirty.xmin : SnapshotDirty.xmax; + TransactionId xwait; + + /* + * It is a duplicate. If we are only doing a partial + * check, then don't bother checking if the tuple is being + * updated in another transaction. Just return the fact + * that it is a potential conflict and leave the full + * check till later. + */ + if (checkUnique == UNIQUE_CHECK_PARTIAL) + { + if (nbuf != InvalidBuffer) + _bt_relbuf(rel, nbuf); + *is_unique = false; + return InvalidTransactionId; + } /* * If this tuple is being updated by other transaction * then we have to wait for its commit/abort. */ + xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ? + SnapshotDirty.xmin : SnapshotDirty.xmax; + if (TransactionIdIsValid(xwait)) { if (nbuf != InvalidBuffer) @@ -295,10 +374,38 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, break; } - ereport(ERROR, - (errcode(ERRCODE_UNIQUE_VIOLATION), - errmsg("duplicate key value violates unique constraint \"%s\"", - RelationGetRelationName(rel)))); + /* + * This is a definite conflict. Break the tuple down into + * datums and report the error. But first, make sure we + * release the buffer locks we're holding --- + * BuildIndexValueDescription could make catalog accesses, + * which in the worst case might touch this same index and + * cause deadlocks. + */ + if (nbuf != InvalidBuffer) + _bt_relbuf(rel, nbuf); + _bt_relbuf(rel, buf); + + { + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + char *key_desc; + + index_deform_tuple(itup, RelationGetDescr(rel), + values, isnull); + + key_desc = BuildIndexValueDescription(rel, values, + isnull); + + ereport(ERROR, + (errcode(ERRCODE_UNIQUE_VIOLATION), + errmsg("duplicate key value violates unique constraint \"%s\"", + RelationGetRelationName(rel)), + key_desc ? errdetail("Key %s already exists.", + key_desc) : 0, + errtableconstraint(heapRel, + RelationGetRelationName(rel)))); + } } else if (all_dead) { @@ -309,11 +416,15 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, */ ItemIdMarkDead(curitemid); opaque->btpo_flags |= BTP_HAS_GARBAGE; - /* be sure to mark the proper buffer dirty... */ + + /* + * Mark buffer with a dirty hint, since state is not + * crucial. Be sure to mark the proper buffer dirty. + */ if (nbuf != InvalidBuffer) - SetBufferCommitInfoNeedsSave(nbuf); + MarkBufferDirtyHint(nbuf, true); else - SetBufferCommitInfoNeedsSave(buf); + MarkBufferDirtyHint(buf, true); } } } @@ -349,6 +460,20 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, } } + /* + * If we are doing a recheck then we should have found the tuple we are + * checking. Otherwise there's something very wrong --- probably, the + * index is on a non-immutable expression. + */ + if (checkUnique == UNIQUE_CHECK_EXISTING && !found) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("failed to re-find tuple within index \"%s\"", + RelationGetRelationName(rel)), + errhint("This may be because of a non-immutable index expression."), + errtableconstraint(heapRel, + RelationGetRelationName(rel)))); + if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf); @@ -362,7 +487,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, * If the new key is equal to one or more existing keys, we can * legitimately place it anywhere in the series of equal keys --- in fact, * if the new key is equal to the page's "high key" we can place it on - * the next page. If it is equal to the high key, and there's not room + * the next page. If it is equal to the high key, and there's not room * to insert the new tuple on the current page without splitting, then * we can move right hoping to find more free space and avoid a split. * (We should not move right indefinitely, however, since that leads to @@ -373,9 +498,9 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, * If there's not enough room in the space, we try to make room by * removing any LP_DEAD tuples. * - * On entry, *buf and *offsetptr point to the first legal position + * On entry, *bufptr and *offsetptr point to the first legal position * where the new tuple could be inserted. The caller should hold an - * exclusive lock on *buf. *offsetptr can also be set to + * exclusive lock on *bufptr. *offsetptr can also be set to * InvalidOffsetNumber, in which case the function will search for the * right location within the page if needed. On exit, they point to the * chosen insert location. If _bt_findinsertloc decides to move right, @@ -391,7 +516,9 @@ _bt_findinsertloc(Relation rel, OffsetNumber *offsetptr, int keysz, ScanKey scankey, - IndexTuple newtup) + IndexTuple newtup, + BTStack stack, + Relation heapRel) { Buffer buf = *bufptr; Page page = BufferGetPage(buf); @@ -414,16 +541,20 @@ _bt_findinsertloc(Relation rel, * able to fit three items on every page, so restrict any one item to 1/3 * the per-page available space. Note that at this point, itemsz doesn't * include the ItemId. + * + * NOTE: if you change this, see also the similar code in _bt_buildadd(). */ if (itemsz > BTMaxItemSize(page)) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("index row size %lu exceeds btree maximum, %lu", - (unsigned long) itemsz, - (unsigned long) BTMaxItemSize(page)), + errmsg("index row size %zu exceeds maximum %zu for index \"%s\"", + itemsz, BTMaxItemSize(page), + RelationGetRelationName(rel)), errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n" "Consider a function index of an MD5 hash of the value, " - "or use full text indexing."))); + "or use full text indexing."), + errtableconstraint(heapRel, + RelationGetRelationName(rel)))); /*---------- * If we will need to split the page to put the item on this page, @@ -438,7 +569,7 @@ _bt_findinsertloc(Relation rel, * on every insert. We implement "get tired" as a random choice, * since stopping after scanning a fixed number of pages wouldn't work * well (we'd never reach the right-hand side of previously split - * pages). Currently the probability of moving right is set at 0.99, + * pages). Currently the probability of moving right is set at 0.99, * which may seem too high to change the behavior much, but it does an * excellent job of preventing O(N^2) behavior with many equal keys. *---------- @@ -448,6 +579,7 @@ _bt_findinsertloc(Relation rel, while (PageGetFreeSpace(page) < itemsz) { Buffer rbuf; + BlockNumber rblkno; /* * before considering moving right, see if we can obtain enough space @@ -455,7 +587,7 @@ _bt_findinsertloc(Relation rel, */ if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop)) { - _bt_vacuum_one_page(rel, buf); + _bt_vacuum_one_page(rel, buf, heapRel); /* * remember that we vacuumed this page, because that makes the @@ -485,18 +617,33 @@ _bt_findinsertloc(Relation rel, */ rbuf = InvalidBuffer; + rblkno = lpageop->btpo_next; for (;;) { - BlockNumber rblkno = lpageop->btpo_next; - rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE); page = BufferGetPage(rbuf); lpageop = (BTPageOpaque) PageGetSpecialPointer(page); + + /* + * If this page was incompletely split, finish the split now. We + * do this while holding a lock on the left sibling, which is not + * good because finishing the split could be a fairly lengthy + * operation. But this should happen very seldom. + */ + if (P_INCOMPLETE_SPLIT(lpageop)) + { + _bt_finish_split(rel, rbuf, stack); + rbuf = InvalidBuffer; + continue; + } + if (!P_IGNORE(lpageop)) break; if (P_RIGHTMOST(lpageop)) elog(ERROR, "fell off the end of index \"%s\"", RelationGetRelationName(rel)); + + rblkno = lpageop->btpo_next; } _bt_relbuf(rel, buf); buf = rbuf; @@ -538,10 +685,14 @@ _bt_findinsertloc(Relation rel, * child page on the parent. * + updates the metapage if a true root or fast root is split. * - * On entry, we must have the right buffer in which to do the - * insertion, and the buffer must be pinned and write-locked. On return, + * On entry, we must have the correct buffer in which to do the + * insertion, and the buffer must be pinned and write-locked. On return, * we will have dropped both the pin and the lock on the buffer. * + * When inserting to a non-leaf page, 'cbuf' is the left-sibling of the + * page we're inserting the downlink for. This function will clear the + * INCOMPLETE_SPLIT flag on it, and release the buffer. + * * The locking interactions in this code are critical. You should * grok Lehman and Yao's paper before making any changes. In addition, * you need to understand how we disambiguate duplicate keys in this @@ -555,6 +706,7 @@ _bt_findinsertloc(Relation rel, static void _bt_insertonpg(Relation rel, Buffer buf, + Buffer cbuf, BTStack stack, IndexTuple itup, OffsetNumber newitemoff, @@ -568,6 +720,14 @@ _bt_insertonpg(Relation rel, page = BufferGetPage(buf); lpageop = (BTPageOpaque) PageGetSpecialPointer(page); + /* child buffer must be given iff inserting on an internal page */ + Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf)); + + /* The caller should've finished any incomplete splits already. */ + if (P_INCOMPLETE_SPLIT(lpageop)) + elog(ERROR, "cannot insert to incompletely split page %u", + BufferGetBlockNumber(buf)); + itemsz = IndexTupleDSize(*itup); itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we * need to be consistent */ @@ -592,8 +752,11 @@ _bt_insertonpg(Relation rel, &newitemonleft); /* split the buffer into left and right halves */ - rbuf = _bt_split(rel, buf, firstright, + rbuf = _bt_split(rel, buf, cbuf, firstright, newitemoff, itemsz, itup, newitemonleft); + PredicateLockPageSplit(rel, + BufferGetBlockNumber(buf), + BufferGetBlockNumber(rbuf)); /*---------- * By here, @@ -650,7 +813,9 @@ _bt_insertonpg(Relation rel, /* Do the update. No ereport(ERROR) until changes are logged */ START_CRIT_SECTION(); - _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page"); + if (!_bt_pgaddtup(page, itemsz, itup, newitemoff)) + elog(PANIC, "failed to add new item to block %u in index \"%s\"", + itup_blkno, RelationGetRelationName(rel)); MarkBufferDirty(buf); @@ -661,38 +826,40 @@ _bt_insertonpg(Relation rel, MarkBufferDirty(metabuf); } + /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */ + if (BufferIsValid(cbuf)) + { + Page cpage = BufferGetPage(cbuf); + BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage); + + Assert(P_INCOMPLETE_SPLIT(cpageop)); + cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT; + MarkBufferDirty(cbuf); + } + /* XLOG stuff */ - if (!rel->rd_istemp) + if (RelationNeedsWAL(rel)) { xl_btree_insert xlrec; - BlockNumber xldownlink; xl_btree_metadata xlmeta; uint8 xlinfo; XLogRecPtr recptr; - XLogRecData rdata[4]; - XLogRecData *nextrdata; IndexTupleData trunctuple; - xlrec.target.node = rel->rd_node; - ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off); + xlrec.offnum = itup_off; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeInsert; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = nextrdata = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert); if (P_ISLEAF(lpageop)) xlinfo = XLOG_BTREE_INSERT_LEAF; else { - xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid)); - Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); - - nextrdata->data = (char *) &xldownlink; - nextrdata->len = sizeof(BlockNumber); - nextrdata->buffer = InvalidBuffer; - nextrdata->next = nextrdata + 1; - nextrdata++; + /* + * Register the left child whose INCOMPLETE_SPLIT flag was + * cleared. + */ + XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD); xlinfo = XLOG_BTREE_INSERT_UPPER; } @@ -704,54 +871,45 @@ _bt_insertonpg(Relation rel, xlmeta.fastroot = metad->btm_fastroot; xlmeta.fastlevel = metad->btm_fastlevel; - nextrdata->data = (char *) &xlmeta; - nextrdata->len = sizeof(xl_btree_metadata); - nextrdata->buffer = InvalidBuffer; - nextrdata->next = nextrdata + 1; - nextrdata++; + XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT); + XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata)); xlinfo = XLOG_BTREE_INSERT_META; } /* Read comments in _bt_pgaddtup */ + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop)) { trunctuple = *itup; trunctuple.t_info = sizeof(IndexTupleData); - nextrdata->data = (char *) &trunctuple; - nextrdata->len = sizeof(IndexTupleData); + XLogRegisterBufData(0, (char *) &trunctuple, + sizeof(IndexTupleData)); } else - { - nextrdata->data = (char *) itup; - nextrdata->len = IndexTupleDSize(*itup); - } - nextrdata->buffer = buf; - nextrdata->buffer_std = true; - nextrdata->next = NULL; + XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup)); - recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata); + recptr = XLogInsert(RM_BTREE_ID, xlinfo); if (BufferIsValid(metabuf)) { PageSetLSN(metapg, recptr); - PageSetTLI(metapg, ThisTimeLineID); + } + if (BufferIsValid(cbuf)) + { + PageSetLSN(BufferGetPage(cbuf), recptr); } PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); } END_CRIT_SECTION(); - /* release buffers; send out relcache inval if metapage changed */ + /* release buffers */ if (BufferIsValid(metabuf)) - { - if (!InRecovery) - CacheInvalidateRelcache(rel); _bt_relbuf(rel, metabuf); - } - + if (BufferIsValid(cbuf)) + _bt_relbuf(rel, cbuf); _bt_relbuf(rel, buf); } } @@ -764,11 +922,15 @@ _bt_insertonpg(Relation rel, * new right page. newitemoff etc. tell us about the new item that * must be inserted along with the data from the old page. * + * When splitting a non-leaf page, 'cbuf' is the left-sibling of the + * page we're inserting the downlink for. This function will clear the + * INCOMPLETE_SPLIT flag on it, and release the buffer. + * * Returns the new right sibling of buf, pinned and write-locked. * The pin and lock on buf are maintained. */ static Buffer -_bt_split(Relation rel, Buffer buf, OffsetNumber firstright, +_bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem, bool newitemonleft) { @@ -776,6 +938,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, Page origpage; Page leftpage, rightpage; + BlockNumber origpagenumber, + rightpagenumber; BTPageOpaque ropaque, lopaque, oopaque; @@ -790,22 +954,38 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, OffsetNumber maxoff; OffsetNumber i; bool isroot; + bool isleaf; + /* Acquire a new page to split into */ rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + + /* + * origpage is the original page to be split. leftpage is a temporary + * buffer that receives the left-sibling data, which will be copied back + * into origpage on success. rightpage is the new page that receives the + * right-sibling data. If we fail before reaching the critical section, + * origpage hasn't been modified and leftpage is only workspace. In + * principle we shouldn't need to worry about rightpage either, because it + * hasn't been linked into the btree page structure; but to avoid leaving + * possibly-confusing junk behind, we are careful to rewrite rightpage as + * zeroes before throwing any error. + */ origpage = BufferGetPage(buf); leftpage = PageGetTempPage(origpage); rightpage = BufferGetPage(rbuf); + origpagenumber = BufferGetBlockNumber(buf); + rightpagenumber = BufferGetBlockNumber(rbuf); + _bt_pageinit(leftpage, BufferGetPageSize(buf)); /* rightpage was already initialized by _bt_getbuf */ /* - * Copy the original page's LSN and TLI into leftpage, which will become - * the updated version of the page. We need this because XLogInsert will - * examine these fields and possibly dump them in a page image. + * Copy the original page's LSN into leftpage, which will become the + * updated version of the page. We need this because XLogInsert will + * examine the LSN and possibly dump it in a page image. */ PageSetLSN(leftpage, PageGetLSN(origpage)); - PageSetTLI(leftpage, PageGetTLI(origpage)); /* init btree private data */ oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage); @@ -813,15 +993,18 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); isroot = P_ISROOT(oopaque); + isleaf = P_ISLEAF(oopaque); /* if we're splitting this page, it won't be the root when we're done */ /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */ lopaque->btpo_flags = oopaque->btpo_flags; lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE); ropaque->btpo_flags = lopaque->btpo_flags; + /* set flag in left page indicating that the right page has no downlink */ + lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT; lopaque->btpo_prev = oopaque->btpo_prev; - lopaque->btpo_next = BufferGetBlockNumber(rbuf); - ropaque->btpo_prev = BufferGetBlockNumber(buf); + lopaque->btpo_next = rightpagenumber; + ropaque->btpo_prev = origpagenumber; ropaque->btpo_next = oopaque->btpo_next; lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level; /* Since we already have write-lock on both pages, ok to read cycleid */ @@ -844,9 +1027,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, item = (IndexTuple) PageGetItem(origpage, itemid); if (PageAddItem(rightpage, (Item) item, itemsz, rightoff, false, false) == InvalidOffsetNumber) - elog(PANIC, "failed to add hikey to the right sibling" + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add hikey to the right sibling" " while splitting block %u of index \"%s\"", - BufferGetBlockNumber(buf), RelationGetRelationName(rel)); + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } @@ -871,9 +1057,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, } if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, false, false) == InvalidOffsetNumber) - elog(PANIC, "failed to add hikey to the left sibling" + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add hikey to the left sibling" " while splitting block %u of index \"%s\"", - BufferGetBlockNumber(buf), RelationGetRelationName(rel)); + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); /* @@ -895,14 +1084,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, { if (newitemonleft) { - _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } } @@ -910,14 +1109,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, /* decide which page to put it on */ if (i < firstright) { - _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add old item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add old item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } } @@ -931,8 +1140,13 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * not be splitting the page). */ Assert(!newitemonleft); - _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } @@ -944,16 +1158,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * neighbors. */ - if (!P_RIGHTMOST(ropaque)) + if (!P_RIGHTMOST(oopaque)) { - sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); + sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE); spage = BufferGetPage(sbuf); sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); - if (sopaque->btpo_prev != ropaque->btpo_prev) - elog(PANIC, "right sibling's left-link doesn't match: " - "block %u links to %u instead of expected %u in index \"%s\"", - ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev, + if (sopaque->btpo_prev != origpagenumber) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "right sibling's left-link doesn't match: " + "block %u links to %u instead of expected %u in index \"%s\"", + oopaque->btpo_next, sopaque->btpo_prev, origpagenumber, RelationGetRelationName(rel)); + } /* * Check to see if we can set the SPLIT_END flag in the right-hand @@ -964,7 +1181,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * page. If you're confused, imagine that page A splits to A B and * then again, yielding A C B, while vacuum is in progress. Tuples * originally in A could now be in either B or C, hence vacuum must - * examine both pages. But if D, our right sibling, has a different + * examine both pages. But if D, our right sibling, has a different * cycleid then it could not contain any tuples that were in A when * the vacuum started. */ @@ -978,8 +1195,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * * NO EREPORT(ERROR) till right sibling is updated. We can get away with * not starting the critical section till here because we haven't been - * scribbling on the original page yet, and we don't care about the new - * sibling until it's linked into the btree. + * scribbling on the original page yet; see comments above. */ START_CRIT_SECTION(); @@ -991,54 +1207,74 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * (in the page management code) that the center of a page always be * clean, and the most efficient way to guarantee this is just to compact * the data by reinserting it into a new left page. (XXX the latter - * comment is probably obsolete.) + * comment is probably obsolete; but in any case it's good to not scribble + * on the original page until we enter the critical section.) * * We need to do this before writing the WAL record, so that XLogInsert * can WAL log an image of the page if necessary. */ PageRestoreTempPage(leftpage, origpage); + /* leftpage, lopaque must not be used below here */ MarkBufferDirty(buf); MarkBufferDirty(rbuf); if (!P_RIGHTMOST(ropaque)) { - sopaque->btpo_prev = BufferGetBlockNumber(rbuf); + sopaque->btpo_prev = rightpagenumber; MarkBufferDirty(sbuf); } + /* + * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes + * a split. + */ + if (!isleaf) + { + Page cpage = BufferGetPage(cbuf); + BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage); + + cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT; + MarkBufferDirty(cbuf); + } + /* XLOG stuff */ - if (!rel->rd_istemp) + if (RelationNeedsWAL(rel)) { xl_btree_split xlrec; uint8 xlinfo; XLogRecPtr recptr; - XLogRecData rdata[7]; - XLogRecData *lastrdata; - xlrec.node = rel->rd_node; - xlrec.leftsib = BufferGetBlockNumber(buf); - xlrec.rightsib = BufferGetBlockNumber(rbuf); - xlrec.rnext = ropaque->btpo_next; xlrec.level = ropaque->btpo.level; xlrec.firstright = firstright; + xlrec.newitemoff = newitemoff; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeSplit; - rdata[0].buffer = InvalidBuffer; - - lastrdata = &rdata[0]; + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit); - if (ropaque->btpo.level > 0) - { - /* Log downlink on non-leaf pages */ - lastrdata->next = lastrdata + 1; - lastrdata++; + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); + XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT); + /* Log the right sibling, because we've changed its prev-pointer. */ + if (!P_RIGHTMOST(ropaque)) + XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD); + if (BufferIsValid(cbuf)) + XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD); - lastrdata->data = (char *) &newitem->t_tid.ip_blkid; - lastrdata->len = sizeof(BlockIdData); - lastrdata->buffer = InvalidBuffer; + /* + * Log the new item, if it was inserted on the left page. (If it was + * put on the right page, we don't need to explicitly WAL log it + * because it's included with all the other items on the right page.) + * Show the new item as belonging to the left page buffer, so that it + * is not stored if XLogInsert decides it needs a full-page image of + * the left page. We store the offset anyway, though, to support + * archive compression of these records. + */ + if (newitemonleft) + XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz)); + /* Log left page */ + if (!isleaf) + { /* * We must also log the left page's high key, because the right * page's leftmost key is suppressed on non-leaf levels. Show it @@ -1046,58 +1282,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * if XLogInsert decides it needs a full-page image of the left * page. */ - lastrdata->next = lastrdata + 1; - lastrdata++; - itemid = PageGetItemId(origpage, P_HIKEY); item = (IndexTuple) PageGetItem(origpage, itemid); - lastrdata->data = (char *) item; - lastrdata->len = MAXALIGN(IndexTupleSize(item)); - lastrdata->buffer = buf; /* backup block 1 */ - lastrdata->buffer_std = true; - } - - /* - * Log the new item and its offset, if it was inserted on the left - * page. (If it was put on the right page, we don't need to explicitly - * WAL log it because it's included with all the other items on the - * right page.) Show the new item as belonging to the left page - * buffer, so that it is not stored if XLogInsert decides it needs a - * full-page image of the left page. We store the offset anyway, - * though, to support archive compression of these records. - */ - if (newitemonleft) - { - lastrdata->next = lastrdata + 1; - lastrdata++; - - lastrdata->data = (char *) &newitemoff; - lastrdata->len = sizeof(OffsetNumber); - lastrdata->buffer = InvalidBuffer; - - lastrdata->next = lastrdata + 1; - lastrdata++; - - lastrdata->data = (char *) newitem; - lastrdata->len = MAXALIGN(newitemsz); - lastrdata->buffer = buf; /* backup block 1 */ - lastrdata->buffer_std = true; - } - else if (ropaque->btpo.level == 0) - { - /* - * Although we don't need to WAL-log the new item, we still need - * XLogInsert to consider storing a full-page image of the left - * page, so make an empty entry referencing that buffer. This also - * ensures that the left page is always backup block 1. - */ - lastrdata->next = lastrdata + 1; - lastrdata++; - - lastrdata->data = NULL; - lastrdata->len = 0; - lastrdata->buffer = buf; /* backup block 1 */ - lastrdata->buffer_std = true; + XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item))); } /* @@ -1112,44 +1299,26 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * and so the item pointers can be reconstructed. See comments for * _bt_restore_page(). */ - lastrdata->next = lastrdata + 1; - lastrdata++; - - lastrdata->data = (char *) rightpage + - ((PageHeader) rightpage)->pd_upper; - lastrdata->len = ((PageHeader) rightpage)->pd_special - - ((PageHeader) rightpage)->pd_upper; - lastrdata->buffer = InvalidBuffer; - - /* Log the right sibling, because we've changed its' prev-pointer. */ - if (!P_RIGHTMOST(ropaque)) - { - lastrdata->next = lastrdata + 1; - lastrdata++; - - lastrdata->data = NULL; - lastrdata->len = 0; - lastrdata->buffer = sbuf; /* backup block 2 */ - lastrdata->buffer_std = true; - } - - lastrdata->next = NULL; + XLogRegisterBufData(1, + (char *) rightpage + ((PageHeader) rightpage)->pd_upper, + ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper); if (isroot) xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT; else xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R; - recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata); + recptr = XLogInsert(RM_BTREE_ID, xlinfo); PageSetLSN(origpage, recptr); - PageSetTLI(origpage, ThisTimeLineID); PageSetLSN(rightpage, recptr); - PageSetTLI(rightpage, ThisTimeLineID); if (!P_RIGHTMOST(ropaque)) { PageSetLSN(spage, recptr); - PageSetTLI(spage, ThisTimeLineID); + } + if (!isleaf) + { + PageSetLSN(BufferGetPage(cbuf), recptr); } } @@ -1159,6 +1328,10 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, if (!P_RIGHTMOST(ropaque)) _bt_relbuf(rel, sbuf); + /* release the child */ + if (!isleaf) + _bt_relbuf(rel, cbuf); + /* split's done */ return rbuf; } @@ -1185,7 +1358,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * * We return the index of the first existing tuple that should go on the * righthand page, plus a boolean indicating whether the new tuple goes on - * the left or right page. The bool is necessary to disambiguate the case + * the left or right page. The bool is necessary to disambiguate the case * where firstright == newitemoff. */ static OffsetNumber @@ -1421,7 +1594,7 @@ _bt_checksplitloc(FindSplitData *state, * * On entry, buf and rbuf are the left and right split pages, which we * still hold write locks on per the L&Y algorithm. We release the - * write locks once we have write lock on the parent page. (Any sooner, + * write locks once we have write lock on the parent page. (Any sooner, * and it'd be possible for some other process to try to split or delete * one of these pages, and get confused because it cannot find the downlink.) * @@ -1429,10 +1602,8 @@ _bt_checksplitloc(FindSplitData *state, * have to be efficient (concurrent ROOT split, WAL recovery) * is_root - we split the true root * is_only - we split a page alone on its level (might have been fast root) - * - * This is exported so it can be called by nbtxlog.c. */ -void +static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf, @@ -1444,15 +1615,14 @@ _bt_insert_parent(Relation rel, * Here we have to do something Lehman and Yao don't talk about: deal with * a root split and construction of a new root. If our stack is empty * then we have just split a node on what had been the root level when we - * descended the tree. If it was still the root then we perform a + * descended the tree. If it was still the root then we perform a * new-root construction. If it *wasn't* the root anymore, search to find * the next higher level that someone constructed meanwhile, and find the * right place to insert as for the normal case. * * If we have to search for the parent level, we do so by re-descending * from the root. This is not super-efficient, but it's rare enough not - * to matter. (This path is also taken when called from WAL recovery --- - * we have no stack in that case.) + * to matter. */ if (is_root) { @@ -1481,8 +1651,7 @@ _bt_insert_parent(Relation rel, { BTPageOpaque lpageop; - if (!InRecovery) - elog(DEBUG2, "concurrent ROOT page split"); + elog(DEBUG2, "concurrent ROOT page split"); lpageop = (BTPageOpaque) PageGetSpecialPointer(page); /* Find the leftmost page at the next level up */ pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false); @@ -1511,12 +1680,13 @@ _bt_insert_parent(Relation rel, * 05/27/97 */ ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY); - pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); - /* Now we can unlock the children */ + /* + * Now we can unlock the right child. The left child will be unlocked + * by _bt_insertonpg(). + */ _bt_relbuf(rel, rbuf); - _bt_relbuf(rel, buf); /* Check for error only after writing children */ if (pbuf == InvalidBuffer) @@ -1524,7 +1694,7 @@ _bt_insert_parent(Relation rel, RelationGetRelationName(rel), bknum, rbknum); /* Recursively update the parent */ - _bt_insertonpg(rel, pbuf, stack->bts_parent, + _bt_insertonpg(rel, pbuf, buf, stack->bts_parent, new_item, stack->bts_offset + 1, is_only); @@ -1533,6 +1703,62 @@ _bt_insert_parent(Relation rel, } } +/* + * _bt_finish_split() -- Finish an incomplete split + * + * A crash or other failure can leave a split incomplete. The insertion + * routines won't allow to insert on a page that is incompletely split. + * Before inserting on such a page, call _bt_finish_split(). + * + * On entry, 'lbuf' must be locked in write-mode. On exit, it is unlocked + * and unpinned. + */ +void +_bt_finish_split(Relation rel, Buffer lbuf, BTStack stack) +{ + Page lpage = BufferGetPage(lbuf); + BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage); + Buffer rbuf; + Page rpage; + BTPageOpaque rpageop; + bool was_root; + bool was_only; + + Assert(P_INCOMPLETE_SPLIT(lpageop)); + + /* Lock right sibling, the one missing the downlink */ + rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE); + rpage = BufferGetPage(rbuf); + rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); + + /* Could this be a root split? */ + if (!stack) + { + Buffer metabuf; + Page metapg; + BTMetaPageData *metad; + + /* acquire lock on the metapage */ + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); + metapg = BufferGetPage(metabuf); + metad = BTPageGetMeta(metapg); + + was_root = (metad->btm_root == BufferGetBlockNumber(lbuf)); + + _bt_relbuf(rel, metabuf); + } + else + was_root = false; + + /* Was this the only page on the level before split? */ + was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop)); + + elog(DEBUG1, "finishing incomplete split of %u/%u", + BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf)); + + _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only); +} + /* * _bt_getstackbuf() -- Walk back up the tree one step, and find the item * we last looked at in the parent. @@ -1565,6 +1791,12 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access) page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); + if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque)) + { + _bt_finish_split(rel, buf, stack->bts_parent); + continue; + } + if (!P_IGNORE(opaque)) { OffsetNumber offnum, @@ -1594,7 +1826,7 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access) /* * These loops will check every item on the page --- but in an * order that's attuned to the probability of where it actually - * is. Scan to the right first, then to the left. + * is. Scan to the right first, then to the left. */ for (offnum = start; offnum <= maxoff; @@ -1669,10 +1901,13 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) rbkno; BlockNumber rootblknum; BTPageOpaque rootopaque; + BTPageOpaque lopaque; ItemId itemid; IndexTuple item; - Size itemsz; - IndexTuple new_item; + IndexTuple left_item; + Size left_item_sz; + IndexTuple right_item; + Size right_item_sz; Buffer metabuf; Page metapg; BTMetaPageData *metad; @@ -1680,6 +1915,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) lbkno = BufferGetBlockNumber(lbuf); rbkno = BufferGetBlockNumber(rbuf); lpage = BufferGetPage(lbuf); + lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage); /* get a new root page */ rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); @@ -1691,6 +1927,26 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) metapg = BufferGetPage(metabuf); metad = BTPageGetMeta(metapg); + /* + * Create downlink item for left page (old root). Since this will be the + * first item in a non-leaf page, it implicitly has minus-infinity key + * value, so we need not store any actual key in it. + */ + left_item_sz = sizeof(IndexTupleData); + left_item = (IndexTuple) palloc(left_item_sz); + left_item->t_info = left_item_sz; + ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY); + + /* + * Create downlink item for right page. The key for it is obtained from + * the "high key" position in the left page. + */ + itemid = PageGetItemId(lpage, P_HIKEY); + right_item_sz = ItemIdGetLength(itemid); + item = (IndexTuple) PageGetItem(lpage, itemid); + right_item = CopyIndexTuple(item); + ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY); + /* NO EREPORT(ERROR) from here till newroot op is logged */ START_CRIT_SECTION(); @@ -1708,16 +1964,6 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) metad->btm_fastroot = rootblknum; metad->btm_fastlevel = rootopaque->btpo.level; - /* - * Create downlink item for left page (old root). Since this will be the - * first item in a non-leaf page, it implicitly has minus-infinity key - * value, so we need not store any actual key in it. - */ - itemsz = sizeof(IndexTupleData); - new_item = (IndexTuple) palloc(itemsz); - new_item->t_info = itemsz; - ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY); - /* * Insert the left page pointer into the new root page. The root page is * the rightmost page on its level so there is no "high key" in it; the @@ -1726,79 +1972,77 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * Note: we *must* insert the two items in item-number order, for the * benefit of _bt_restore_page(). */ - if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, + if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY, false, false) == InvalidOffsetNumber) elog(PANIC, "failed to add leftkey to new root page" " while splitting block %u of index \"%s\"", BufferGetBlockNumber(lbuf), RelationGetRelationName(rel)); - pfree(new_item); - - /* - * Create downlink item for right page. The key for it is obtained from - * the "high key" position in the left page. - */ - itemid = PageGetItemId(lpage, P_HIKEY); - itemsz = ItemIdGetLength(itemid); - item = (IndexTuple) PageGetItem(lpage, itemid); - new_item = CopyIndexTuple(item); - ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY); /* * insert the right page pointer into the new root page. */ - if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, + if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY, false, false) == InvalidOffsetNumber) elog(PANIC, "failed to add rightkey to new root page" " while splitting block %u of index \"%s\"", BufferGetBlockNumber(lbuf), RelationGetRelationName(rel)); - pfree(new_item); + + /* Clear the incomplete-split flag in the left child */ + Assert(P_INCOMPLETE_SPLIT(lopaque)); + lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT; + MarkBufferDirty(lbuf); MarkBufferDirty(rootbuf); MarkBufferDirty(metabuf); /* XLOG stuff */ - if (!rel->rd_istemp) + if (RelationNeedsWAL(rel)) { xl_btree_newroot xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; + xl_btree_metadata md; - xlrec.node = rel->rd_node; xlrec.rootblk = rootblknum; xlrec.level = metad->btm_level; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeNewroot; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot); + + XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT); + XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD); + XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT); + + md.root = rootblknum; + md.level = metad->btm_level; + md.fastroot = rootblknum; + md.fastlevel = metad->btm_level; + + XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata)); /* * Direct access to page is not good but faster - we should implement * some new func in page API. */ - rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper; - rdata[1].len = ((PageHeader) rootpage)->pd_special - - ((PageHeader) rootpage)->pd_upper; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; + XLogRegisterBufData(0, + (char *) rootpage + ((PageHeader) rootpage)->pd_upper, + ((PageHeader) rootpage)->pd_special - + ((PageHeader) rootpage)->pd_upper); - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata); + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT); + PageSetLSN(lpage, recptr); PageSetLSN(rootpage, recptr); - PageSetTLI(rootpage, ThisTimeLineID); PageSetLSN(metapg, recptr); - PageSetTLI(metapg, ThisTimeLineID); } END_CRIT_SECTION(); - /* send out relcache inval for metapage change */ - if (!InRecovery) - CacheInvalidateRelcache(rel); - /* done with metapage */ _bt_relbuf(rel, metabuf); + pfree(left_item); + pfree(right_item); + return rootbuf; } @@ -1817,13 +2061,11 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * we insert the tuples in order, so that the given itup_off does * represent the final position of the tuple! */ -static void -_bt_pgaddtup(Relation rel, - Page page, +static bool +_bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, - OffsetNumber itup_off, - const char *where) + OffsetNumber itup_off) { BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); IndexTupleData trunctuple; @@ -1838,8 +2080,9 @@ _bt_pgaddtup(Relation rel, if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false) == InvalidOffsetNumber) - elog(PANIC, "failed to add item to the %s in index \"%s\"", - where, RelationGetRelationName(rel)); + return false; + + return true; } /* @@ -1875,9 +2118,10 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, if (isNull || (scankey->sk_flags & SK_ISNULL)) return false; - result = DatumGetInt32(FunctionCall2(&scankey->sk_func, - datum, - scankey->sk_argument)); + result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func, + scankey->sk_collation, + datum, + scankey->sk_argument)); if (result != 0) return false; @@ -1897,7 +2141,7 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, * super-exclusive "cleanup" lock (see nbtree/README). */ static void -_bt_vacuum_one_page(Relation rel, Buffer buffer) +_bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel) { OffsetNumber deletable[MaxOffsetNumber]; int ndeletable = 0; @@ -1924,7 +2168,7 @@ _bt_vacuum_one_page(Relation rel, Buffer buffer) } if (ndeletable > 0) - _bt_delitems(rel, buffer, deletable, ndeletable); + _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel); /* * Note: if we didn't find any LP_DEAD items, then the page's