* nbtinsert.c
* Item insertion in Lehman and Yao btrees for Postgres.
*
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.173 2009/08/01 20:59:17 tgl Exp $
+ * src/backend/access/nbtree/nbtinsert.c
*
*-------------------------------------------------------------------------
*/
#include "access/heapam.h"
#include "access/nbtree.h"
#include "access/transam.h"
+#include "access/xloginsert.h"
#include "miscadmin.h"
-#include "storage/bufmgr.h"
#include "storage/lmgr.h"
-#include "utils/inval.h"
+#include "storage/predicate.h"
#include "utils/tqual.h"
static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
Relation heapRel, Buffer buf, OffsetNumber offset,
ScanKey itup_scankey,
- IndexUniqueCheck checkUnique, bool *is_unique);
+ IndexUniqueCheck checkUnique, bool *is_unique,
+ uint32 *speculativeToken);
static void _bt_findinsertloc(Relation rel,
Buffer *bufptr,
OffsetNumber *offsetptr,
int keysz,
ScanKey scankey,
- IndexTuple newtup);
-static void _bt_insertonpg(Relation rel, Buffer buf,
+ IndexTuple newtup,
+ BTStack stack,
+ Relation heapRel);
+static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
BTStack stack,
IndexTuple itup,
OffsetNumber newitemoff,
bool split_only_page);
-static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
- OffsetNumber newitemoff, Size newitemsz,
+static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
+ OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
IndexTuple newitem, bool newitemonleft);
+static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
+ BTStack stack, bool is_root, bool is_only);
static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
OffsetNumber newitemoff,
Size newitemsz,
static void _bt_checksplitloc(FindSplitData *state,
OffsetNumber firstoldonright, bool newitemonleft,
int dataitemstoleft, Size firstoldonrightsz);
-static void _bt_pgaddtup(Relation rel, Page page,
- Size itemsize, IndexTuple itup,
- OffsetNumber itup_off, const char *where);
+static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
+ OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
-static void _bt_vacuum_one_page(Relation rel, Buffer buffer);
+static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
/*
* _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
*
- * This routine is called by the public interface routines, btbuild
- * and btinsert. By here, itup is filled in, including the TID.
+ * This routine is called by the public interface routine, btinsert.
+ * By here, itup is filled in, including the TID.
*
* If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
* will allow duplicates. Otherwise (UNIQUE_CHECK_YES or
* If the page was split between the time that we surrendered our read
* lock and acquired our write lock, then this page may no longer be the
* right place for the key we want to insert. In this case, we need to
- * move right in the tree. See Lehman and Yao for an excruciatingly
+ * move right in the tree. See Lehman and Yao for an excruciatingly
* precise description.
*/
- buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
+ buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+ true, stack, BT_WRITE);
/*
* If we're not allowing duplicates, make sure the key isn't already in
* If we must wait for another xact, we release the lock while waiting,
* and then must start over completely.
*
- * For a partial uniqueness check, we don't wait for the other xact.
- * Just let the tuple in and return false for possibly non-unique,
- * or true for definitely unique.
+ * For a partial uniqueness check, we don't wait for the other xact. Just
+ * let the tuple in and return false for possibly non-unique, or true for
+ * definitely unique.
*/
if (checkUnique != UNIQUE_CHECK_NO)
{
TransactionId xwait;
+ uint32 speculativeToken;
offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
- checkUnique, &is_unique);
+ checkUnique, &is_unique, &speculativeToken);
if (TransactionIdIsValid(xwait))
{
/* Have to wait for the other guy ... */
_bt_relbuf(rel, buf);
- XactLockTableWait(xwait);
+
+ /*
+ * If it's a speculative insertion, wait for it to finish (ie. to
+ * go ahead with the insertion, or kill the tuple). Otherwise
+ * wait for the transaction to finish as usual.
+ */
+ if (speculativeToken)
+ SpeculativeInsertionWait(xwait, speculativeToken);
+ else
+ XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
+
/* start over... */
_bt_freestack(stack);
goto top;
if (checkUnique != UNIQUE_CHECK_EXISTING)
{
+ /*
+ * The only conflict predicate locking cares about for indexes is when
+ * an index tuple insert conflicts with an existing lock. Since the
+ * actual location of the insert is hard to predict because of the
+ * random search used to prevent O(N^2) performance when there are
+ * many duplicate entries, we can just use the "first valid" page.
+ */
+ CheckForSerializableConflictIn(rel, NULL, buf);
/* do the insertion */
- _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup);
- _bt_insertonpg(rel, buf, stack, itup, offset, false);
+ _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+ stack, heapRel);
+ _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
}
else
{
* is the first tuple on the next page.
*
* Returns InvalidTransactionId if there is no conflict, else an xact ID
- * we must wait for to see if it commits a conflicting tuple. If an actual
- * conflict is detected, no return --- just ereport().
+ * we must wait for to see if it commits a conflicting tuple. If an actual
+ * conflict is detected, no return --- just ereport(). If an xact ID is
+ * returned, and the conflicting tuple still has a speculative insertion in
+ * progress, *speculativeToken is set to non-zero, and the caller can wait for
+ * the verdict on the insertion using SpeculativeInsertionWait().
*
* However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
* InvalidTransactionId because we don't want to wait. In this case we
static TransactionId
_bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
- IndexUniqueCheck checkUnique, bool *is_unique)
+ IndexUniqueCheck checkUnique, bool *is_unique,
+ uint32 *speculativeToken)
{
TupleDesc itupdesc = RelationGetDescr(rel);
int natts = rel->rd_rel->relnatts;
/*
* It is a duplicate. If we are only doing a partial
- * check, then don't bother checking if the tuple is
- * being updated in another transaction. Just return
- * the fact that it is a potential conflict and leave
- * the full check till later.
+ * check, then don't bother checking if the tuple is being
+ * updated in another transaction. Just return the fact
+ * that it is a potential conflict and leave the full
+ * check till later.
*/
if (checkUnique == UNIQUE_CHECK_PARTIAL)
{
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
/* Tell _bt_doinsert to wait... */
+ *speculativeToken = SnapshotDirty.speculativeToken;
return xwait;
}
}
/*
- * This is a definite conflict. Break the tuple down
- * into datums and report the error. But first, make
- * sure we release the buffer locks we're holding ---
+ * This is a definite conflict. Break the tuple down into
+ * datums and report the error. But first, make sure we
+ * release the buffer locks we're holding ---
* BuildIndexValueDescription could make catalog accesses,
- * which in the worst case might touch this same index
- * and cause deadlocks.
+ * which in the worst case might touch this same index and
+ * cause deadlocks.
*/
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
_bt_relbuf(rel, buf);
{
- Datum values[INDEX_MAX_KEYS];
- bool isnull[INDEX_MAX_KEYS];
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ char *key_desc;
index_deform_tuple(itup, RelationGetDescr(rel),
values, isnull);
+
+ key_desc = BuildIndexValueDescription(rel, values,
+ isnull);
+
ereport(ERROR,
(errcode(ERRCODE_UNIQUE_VIOLATION),
errmsg("duplicate key value violates unique constraint \"%s\"",
RelationGetRelationName(rel)),
- errdetail("Key %s already exists.",
- BuildIndexValueDescription(rel,
- values, isnull))));
+ key_desc ? errdetail("Key %s already exists.",
+ key_desc) : 0,
+ errtableconstraint(heapRel,
+ RelationGetRelationName(rel))));
}
}
else if (all_dead)
*/
ItemIdMarkDead(curitemid);
opaque->btpo_flags |= BTP_HAS_GARBAGE;
- /* be sure to mark the proper buffer dirty... */
+
+ /*
+ * Mark buffer with a dirty hint, since state is not
+ * crucial. Be sure to mark the proper buffer dirty.
+ */
if (nbuf != InvalidBuffer)
- SetBufferCommitInfoNeedsSave(nbuf);
+ MarkBufferDirtyHint(nbuf, true);
else
- SetBufferCommitInfoNeedsSave(buf);
+ MarkBufferDirtyHint(buf, true);
}
}
}
}
/*
- * If we are doing a recheck then we should have found the tuple we
- * are checking. Otherwise there's something very wrong --- probably,
- * the index is on a non-immutable expression.
+ * If we are doing a recheck then we should have found the tuple we are
+ * checking. Otherwise there's something very wrong --- probably, the
+ * index is on a non-immutable expression.
*/
if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("failed to re-find tuple within index \"%s\"",
RelationGetRelationName(rel)),
- errhint("This may be because of a non-immutable index expression.")));
+ errhint("This may be because of a non-immutable index expression."),
+ errtableconstraint(heapRel,
+ RelationGetRelationName(rel))));
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
* If the new key is equal to one or more existing keys, we can
* legitimately place it anywhere in the series of equal keys --- in fact,
* if the new key is equal to the page's "high key" we can place it on
- * the next page. If it is equal to the high key, and there's not room
+ * the next page. If it is equal to the high key, and there's not room
* to insert the new tuple on the current page without splitting, then
* we can move right hoping to find more free space and avoid a split.
* (We should not move right indefinitely, however, since that leads to
* If there's not enough room in the space, we try to make room by
* removing any LP_DEAD tuples.
*
- * On entry, *buf and *offsetptr point to the first legal position
- * where the new tuple could be inserted. The caller should hold an
- * exclusive lock on *buf. *offsetptr can also be set to
+ * On entry, *bufptr and *offsetptr point to the first legal position
+ * where the new tuple could be inserted. The caller should hold an
+ * exclusive lock on *bufptr. *offsetptr can also be set to
* InvalidOffsetNumber, in which case the function will search for the
* right location within the page if needed. On exit, they point to the
* chosen insert location. If _bt_findinsertloc decides to move right,
OffsetNumber *offsetptr,
int keysz,
ScanKey scankey,
- IndexTuple newtup)
+ IndexTuple newtup,
+ BTStack stack,
+ Relation heapRel)
{
Buffer buf = *bufptr;
Page page = BufferGetPage(buf);
* able to fit three items on every page, so restrict any one item to 1/3
* the per-page available space. Note that at this point, itemsz doesn't
* include the ItemId.
+ *
+ * NOTE: if you change this, see also the similar code in _bt_buildadd().
*/
if (itemsz > BTMaxItemSize(page))
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("index row size %lu exceeds btree maximum, %lu",
- (unsigned long) itemsz,
- (unsigned long) BTMaxItemSize(page)),
+ errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
+ itemsz, BTMaxItemSize(page),
+ RelationGetRelationName(rel)),
errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
"Consider a function index of an MD5 hash of the value, "
- "or use full text indexing.")));
+ "or use full text indexing."),
+ errtableconstraint(heapRel,
+ RelationGetRelationName(rel))));
/*----------
* If we will need to split the page to put the item on this page,
* on every insert. We implement "get tired" as a random choice,
* since stopping after scanning a fixed number of pages wouldn't work
* well (we'd never reach the right-hand side of previously split
- * pages). Currently the probability of moving right is set at 0.99,
+ * pages). Currently the probability of moving right is set at 0.99,
* which may seem too high to change the behavior much, but it does an
* excellent job of preventing O(N^2) behavior with many equal keys.
*----------
while (PageGetFreeSpace(page) < itemsz)
{
Buffer rbuf;
+ BlockNumber rblkno;
/*
* before considering moving right, see if we can obtain enough space
*/
if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
{
- _bt_vacuum_one_page(rel, buf);
+ _bt_vacuum_one_page(rel, buf, heapRel);
/*
* remember that we vacuumed this page, because that makes the
*/
rbuf = InvalidBuffer;
+ rblkno = lpageop->btpo_next;
for (;;)
{
- BlockNumber rblkno = lpageop->btpo_next;
-
rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
page = BufferGetPage(rbuf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ /*
+ * If this page was incompletely split, finish the split now. We
+ * do this while holding a lock on the left sibling, which is not
+ * good because finishing the split could be a fairly lengthy
+ * operation. But this should happen very seldom.
+ */
+ if (P_INCOMPLETE_SPLIT(lpageop))
+ {
+ _bt_finish_split(rel, rbuf, stack);
+ rbuf = InvalidBuffer;
+ continue;
+ }
+
if (!P_IGNORE(lpageop))
break;
if (P_RIGHTMOST(lpageop))
elog(ERROR, "fell off the end of index \"%s\"",
RelationGetRelationName(rel));
+
+ rblkno = lpageop->btpo_next;
}
_bt_relbuf(rel, buf);
buf = rbuf;
* child page on the parent.
* + updates the metapage if a true root or fast root is split.
*
- * On entry, we must have the right buffer in which to do the
- * insertion, and the buffer must be pinned and write-locked. On return,
+ * On entry, we must have the correct buffer in which to do the
+ * insertion, and the buffer must be pinned and write-locked. On return,
* we will have dropped both the pin and the lock on the buffer.
*
+ * When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
+ * page we're inserting the downlink for. This function will clear the
+ * INCOMPLETE_SPLIT flag on it, and release the buffer.
+ *
* The locking interactions in this code are critical. You should
* grok Lehman and Yao's paper before making any changes. In addition,
* you need to understand how we disambiguate duplicate keys in this
static void
_bt_insertonpg(Relation rel,
Buffer buf,
+ Buffer cbuf,
BTStack stack,
IndexTuple itup,
OffsetNumber newitemoff,
page = BufferGetPage(buf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ /* child buffer must be given iff inserting on an internal page */
+ Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
+
+ /* The caller should've finished any incomplete splits already. */
+ if (P_INCOMPLETE_SPLIT(lpageop))
+ elog(ERROR, "cannot insert to incompletely split page %u",
+ BufferGetBlockNumber(buf));
+
itemsz = IndexTupleDSize(*itup);
itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
* need to be consistent */
&newitemonleft);
/* split the buffer into left and right halves */
- rbuf = _bt_split(rel, buf, firstright,
+ rbuf = _bt_split(rel, buf, cbuf, firstright,
newitemoff, itemsz, itup, newitemonleft);
+ PredicateLockPageSplit(rel,
+ BufferGetBlockNumber(buf),
+ BufferGetBlockNumber(rbuf));
/*----------
* By here,
/* Do the update. No ereport(ERROR) until changes are logged */
START_CRIT_SECTION();
- _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
+ if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
+ elog(PANIC, "failed to add new item to block %u in index \"%s\"",
+ itup_blkno, RelationGetRelationName(rel));
MarkBufferDirty(buf);
MarkBufferDirty(metabuf);
}
+ /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
+ if (BufferIsValid(cbuf))
+ {
+ Page cpage = BufferGetPage(cbuf);
+ BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+
+ Assert(P_INCOMPLETE_SPLIT(cpageop));
+ cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ MarkBufferDirty(cbuf);
+ }
+
/* XLOG stuff */
- if (!rel->rd_istemp)
+ if (RelationNeedsWAL(rel))
{
xl_btree_insert xlrec;
- BlockNumber xldownlink;
xl_btree_metadata xlmeta;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[4];
- XLogRecData *nextrdata;
IndexTupleData trunctuple;
- xlrec.target.node = rel->rd_node;
- ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
+ xlrec.offnum = itup_off;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeInsert;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = nextrdata = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
if (P_ISLEAF(lpageop))
xlinfo = XLOG_BTREE_INSERT_LEAF;
else
{
- xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
- Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
-
- nextrdata->data = (char *) &xldownlink;
- nextrdata->len = sizeof(BlockNumber);
- nextrdata->buffer = InvalidBuffer;
- nextrdata->next = nextrdata + 1;
- nextrdata++;
+ /*
+ * Register the left child whose INCOMPLETE_SPLIT flag was
+ * cleared.
+ */
+ XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
xlinfo = XLOG_BTREE_INSERT_UPPER;
}
xlmeta.fastroot = metad->btm_fastroot;
xlmeta.fastlevel = metad->btm_fastlevel;
- nextrdata->data = (char *) &xlmeta;
- nextrdata->len = sizeof(xl_btree_metadata);
- nextrdata->buffer = InvalidBuffer;
- nextrdata->next = nextrdata + 1;
- nextrdata++;
+ XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+ XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
xlinfo = XLOG_BTREE_INSERT_META;
}
/* Read comments in _bt_pgaddtup */
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
{
trunctuple = *itup;
trunctuple.t_info = sizeof(IndexTupleData);
- nextrdata->data = (char *) &trunctuple;
- nextrdata->len = sizeof(IndexTupleData);
+ XLogRegisterBufData(0, (char *) &trunctuple,
+ sizeof(IndexTupleData));
}
else
- {
- nextrdata->data = (char *) itup;
- nextrdata->len = IndexTupleDSize(*itup);
- }
- nextrdata->buffer = buf;
- nextrdata->buffer_std = true;
- nextrdata->next = NULL;
+ XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
- recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo);
if (BufferIsValid(metabuf))
{
PageSetLSN(metapg, recptr);
- PageSetTLI(metapg, ThisTimeLineID);
+ }
+ if (BufferIsValid(cbuf))
+ {
+ PageSetLSN(BufferGetPage(cbuf), recptr);
}
PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
}
END_CRIT_SECTION();
- /* release buffers; send out relcache inval if metapage changed */
+ /* release buffers */
if (BufferIsValid(metabuf))
- {
- if (!InRecovery)
- CacheInvalidateRelcache(rel);
_bt_relbuf(rel, metabuf);
- }
-
+ if (BufferIsValid(cbuf))
+ _bt_relbuf(rel, cbuf);
_bt_relbuf(rel, buf);
}
}
* new right page. newitemoff etc. tell us about the new item that
* must be inserted along with the data from the old page.
*
+ * When splitting a non-leaf page, 'cbuf' is the left-sibling of the
+ * page we're inserting the downlink for. This function will clear the
+ * INCOMPLETE_SPLIT flag on it, and release the buffer.
+ *
* Returns the new right sibling of buf, pinned and write-locked.
* The pin and lock on buf are maintained.
*/
static Buffer
-_bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
+_bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
bool newitemonleft)
{
Page origpage;
Page leftpage,
rightpage;
+ BlockNumber origpagenumber,
+ rightpagenumber;
BTPageOpaque ropaque,
lopaque,
oopaque;
OffsetNumber maxoff;
OffsetNumber i;
bool isroot;
+ bool isleaf;
+ /* Acquire a new page to split into */
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
+
+ /*
+ * origpage is the original page to be split. leftpage is a temporary
+ * buffer that receives the left-sibling data, which will be copied back
+ * into origpage on success. rightpage is the new page that receives the
+ * right-sibling data. If we fail before reaching the critical section,
+ * origpage hasn't been modified and leftpage is only workspace. In
+ * principle we shouldn't need to worry about rightpage either, because it
+ * hasn't been linked into the btree page structure; but to avoid leaving
+ * possibly-confusing junk behind, we are careful to rewrite rightpage as
+ * zeroes before throwing any error.
+ */
origpage = BufferGetPage(buf);
leftpage = PageGetTempPage(origpage);
rightpage = BufferGetPage(rbuf);
+ origpagenumber = BufferGetBlockNumber(buf);
+ rightpagenumber = BufferGetBlockNumber(rbuf);
+
_bt_pageinit(leftpage, BufferGetPageSize(buf));
/* rightpage was already initialized by _bt_getbuf */
/*
- * Copy the original page's LSN and TLI into leftpage, which will become
- * the updated version of the page. We need this because XLogInsert will
- * examine these fields and possibly dump them in a page image.
+ * Copy the original page's LSN into leftpage, which will become the
+ * updated version of the page. We need this because XLogInsert will
+ * examine the LSN and possibly dump it in a page image.
*/
PageSetLSN(leftpage, PageGetLSN(origpage));
- PageSetTLI(leftpage, PageGetTLI(origpage));
/* init btree private data */
oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
isroot = P_ISROOT(oopaque);
+ isleaf = P_ISLEAF(oopaque);
/* if we're splitting this page, it won't be the root when we're done */
/* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
lopaque->btpo_flags = oopaque->btpo_flags;
lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
ropaque->btpo_flags = lopaque->btpo_flags;
+ /* set flag in left page indicating that the right page has no downlink */
+ lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
lopaque->btpo_prev = oopaque->btpo_prev;
- lopaque->btpo_next = BufferGetBlockNumber(rbuf);
- ropaque->btpo_prev = BufferGetBlockNumber(buf);
+ lopaque->btpo_next = rightpagenumber;
+ ropaque->btpo_prev = origpagenumber;
ropaque->btpo_next = oopaque->btpo_next;
lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
/* Since we already have write-lock on both pages, ok to read cycleid */
item = (IndexTuple) PageGetItem(origpage, itemid);
if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
false, false) == InvalidOffsetNumber)
- elog(PANIC, "failed to add hikey to the right sibling"
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the right sibling"
" while splitting block %u of index \"%s\"",
- BufferGetBlockNumber(buf), RelationGetRelationName(rel));
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
- elog(PANIC, "failed to add hikey to the left sibling"
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the left sibling"
" while splitting block %u of index \"%s\"",
- BufferGetBlockNumber(buf), RelationGetRelationName(rel));
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
/*
{
if (newitemonleft)
{
- _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
- "left sibling");
+ if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
/* decide which page to put it on */
if (i < firstright)
{
- _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
- "left sibling");
+ if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add old item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add old item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
* not be splitting the page).
*/
Assert(!newitemonleft);
- _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
* neighbors.
*/
- if (!P_RIGHTMOST(ropaque))
+ if (!P_RIGHTMOST(oopaque))
{
- sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
+ sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
spage = BufferGetPage(sbuf);
sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
- if (sopaque->btpo_prev != ropaque->btpo_prev)
- elog(PANIC, "right sibling's left-link doesn't match: "
+ if (sopaque->btpo_prev != origpagenumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "right sibling's left-link doesn't match: "
"block %u links to %u instead of expected %u in index \"%s\"",
- ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
+ oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
RelationGetRelationName(rel));
+ }
/*
* Check to see if we can set the SPLIT_END flag in the right-hand
* page. If you're confused, imagine that page A splits to A B and
* then again, yielding A C B, while vacuum is in progress. Tuples
* originally in A could now be in either B or C, hence vacuum must
- * examine both pages. But if D, our right sibling, has a different
+ * examine both pages. But if D, our right sibling, has a different
* cycleid then it could not contain any tuples that were in A when
* the vacuum started.
*/
*
* NO EREPORT(ERROR) till right sibling is updated. We can get away with
* not starting the critical section till here because we haven't been
- * scribbling on the original page yet, and we don't care about the new
- * sibling until it's linked into the btree.
+ * scribbling on the original page yet; see comments above.
*/
START_CRIT_SECTION();
* (in the page management code) that the center of a page always be
* clean, and the most efficient way to guarantee this is just to compact
* the data by reinserting it into a new left page. (XXX the latter
- * comment is probably obsolete.)
+ * comment is probably obsolete; but in any case it's good to not scribble
+ * on the original page until we enter the critical section.)
*
* We need to do this before writing the WAL record, so that XLogInsert
* can WAL log an image of the page if necessary.
*/
PageRestoreTempPage(leftpage, origpage);
+ /* leftpage, lopaque must not be used below here */
MarkBufferDirty(buf);
MarkBufferDirty(rbuf);
if (!P_RIGHTMOST(ropaque))
{
- sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
+ sopaque->btpo_prev = rightpagenumber;
MarkBufferDirty(sbuf);
}
+ /*
+ * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
+ * a split.
+ */
+ if (!isleaf)
+ {
+ Page cpage = BufferGetPage(cbuf);
+ BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+
+ cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ MarkBufferDirty(cbuf);
+ }
+
/* XLOG stuff */
- if (!rel->rd_istemp)
+ if (RelationNeedsWAL(rel))
{
xl_btree_split xlrec;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[7];
- XLogRecData *lastrdata;
- xlrec.node = rel->rd_node;
- xlrec.leftsib = BufferGetBlockNumber(buf);
- xlrec.rightsib = BufferGetBlockNumber(rbuf);
- xlrec.rnext = ropaque->btpo_next;
xlrec.level = ropaque->btpo.level;
xlrec.firstright = firstright;
+ xlrec.newitemoff = newitemoff;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeSplit;
- rdata[0].buffer = InvalidBuffer;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
- lastrdata = &rdata[0];
-
- if (ropaque->btpo.level > 0)
- {
- /* Log downlink on non-leaf pages */
- lastrdata->next = lastrdata + 1;
- lastrdata++;
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
+ /* Log the right sibling, because we've changed its prev-pointer. */
+ if (!P_RIGHTMOST(ropaque))
+ XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
+ if (BufferIsValid(cbuf))
+ XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
- lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
- lastrdata->len = sizeof(BlockIdData);
- lastrdata->buffer = InvalidBuffer;
+ /*
+ * Log the new item, if it was inserted on the left page. (If it was
+ * put on the right page, we don't need to explicitly WAL log it
+ * because it's included with all the other items on the right page.)
+ * Show the new item as belonging to the left page buffer, so that it
+ * is not stored if XLogInsert decides it needs a full-page image of
+ * the left page. We store the offset anyway, though, to support
+ * archive compression of these records.
+ */
+ if (newitemonleft)
+ XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
+ /* Log left page */
+ if (!isleaf)
+ {
/*
* We must also log the left page's high key, because the right
* page's leftmost key is suppressed on non-leaf levels. Show it
* if XLogInsert decides it needs a full-page image of the left
* page.
*/
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
itemid = PageGetItemId(origpage, P_HIKEY);
item = (IndexTuple) PageGetItem(origpage, itemid);
- lastrdata->data = (char *) item;
- lastrdata->len = MAXALIGN(IndexTupleSize(item));
- lastrdata->buffer = buf; /* backup block 1 */
- lastrdata->buffer_std = true;
- }
-
- /*
- * Log the new item and its offset, if it was inserted on the left
- * page. (If it was put on the right page, we don't need to explicitly
- * WAL log it because it's included with all the other items on the
- * right page.) Show the new item as belonging to the left page
- * buffer, so that it is not stored if XLogInsert decides it needs a
- * full-page image of the left page. We store the offset anyway,
- * though, to support archive compression of these records.
- */
- if (newitemonleft)
- {
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- lastrdata->data = (char *) &newitemoff;
- lastrdata->len = sizeof(OffsetNumber);
- lastrdata->buffer = InvalidBuffer;
-
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- lastrdata->data = (char *) newitem;
- lastrdata->len = MAXALIGN(newitemsz);
- lastrdata->buffer = buf; /* backup block 1 */
- lastrdata->buffer_std = true;
- }
- else if (ropaque->btpo.level == 0)
- {
- /*
- * Although we don't need to WAL-log the new item, we still need
- * XLogInsert to consider storing a full-page image of the left
- * page, so make an empty entry referencing that buffer. This also
- * ensures that the left page is always backup block 1.
- */
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- lastrdata->data = NULL;
- lastrdata->len = 0;
- lastrdata->buffer = buf; /* backup block 1 */
- lastrdata->buffer_std = true;
+ XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
}
/*
* and so the item pointers can be reconstructed. See comments for
* _bt_restore_page().
*/
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- lastrdata->data = (char *) rightpage +
- ((PageHeader) rightpage)->pd_upper;
- lastrdata->len = ((PageHeader) rightpage)->pd_special -
- ((PageHeader) rightpage)->pd_upper;
- lastrdata->buffer = InvalidBuffer;
-
- /* Log the right sibling, because we've changed its' prev-pointer. */
- if (!P_RIGHTMOST(ropaque))
- {
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- lastrdata->data = NULL;
- lastrdata->len = 0;
- lastrdata->buffer = sbuf; /* backup block 2 */
- lastrdata->buffer_std = true;
- }
-
- lastrdata->next = NULL;
+ XLogRegisterBufData(1,
+ (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
+ ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
if (isroot)
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
else
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
- recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo);
PageSetLSN(origpage, recptr);
- PageSetTLI(origpage, ThisTimeLineID);
PageSetLSN(rightpage, recptr);
- PageSetTLI(rightpage, ThisTimeLineID);
if (!P_RIGHTMOST(ropaque))
{
PageSetLSN(spage, recptr);
- PageSetTLI(spage, ThisTimeLineID);
+ }
+ if (!isleaf)
+ {
+ PageSetLSN(BufferGetPage(cbuf), recptr);
}
}
if (!P_RIGHTMOST(ropaque))
_bt_relbuf(rel, sbuf);
+ /* release the child */
+ if (!isleaf)
+ _bt_relbuf(rel, cbuf);
+
/* split's done */
return rbuf;
}
*
* We return the index of the first existing tuple that should go on the
* righthand page, plus a boolean indicating whether the new tuple goes on
- * the left or right page. The bool is necessary to disambiguate the case
+ * the left or right page. The bool is necessary to disambiguate the case
* where firstright == newitemoff.
*/
static OffsetNumber
*
* On entry, buf and rbuf are the left and right split pages, which we
* still hold write locks on per the L&Y algorithm. We release the
- * write locks once we have write lock on the parent page. (Any sooner,
+ * write locks once we have write lock on the parent page. (Any sooner,
* and it'd be possible for some other process to try to split or delete
* one of these pages, and get confused because it cannot find the downlink.)
*
* have to be efficient (concurrent ROOT split, WAL recovery)
* is_root - we split the true root
* is_only - we split a page alone on its level (might have been fast root)
- *
- * This is exported so it can be called by nbtxlog.c.
*/
-void
+static void
_bt_insert_parent(Relation rel,
Buffer buf,
Buffer rbuf,
* Here we have to do something Lehman and Yao don't talk about: deal with
* a root split and construction of a new root. If our stack is empty
* then we have just split a node on what had been the root level when we
- * descended the tree. If it was still the root then we perform a
+ * descended the tree. If it was still the root then we perform a
* new-root construction. If it *wasn't* the root anymore, search to find
* the next higher level that someone constructed meanwhile, and find the
* right place to insert as for the normal case.
*
* If we have to search for the parent level, we do so by re-descending
* from the root. This is not super-efficient, but it's rare enough not
- * to matter. (This path is also taken when called from WAL recovery ---
- * we have no stack in that case.)
+ * to matter.
*/
if (is_root)
{
{
BTPageOpaque lpageop;
- if (!InRecovery)
- elog(DEBUG2, "concurrent ROOT page split");
+ elog(DEBUG2, "concurrent ROOT page split");
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
/* Find the leftmost page at the next level up */
pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
* 05/27/97
*/
ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
-
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
- /* Now we can unlock the children */
+ /*
+ * Now we can unlock the right child. The left child will be unlocked
+ * by _bt_insertonpg().
+ */
_bt_relbuf(rel, rbuf);
- _bt_relbuf(rel, buf);
/* Check for error only after writing children */
if (pbuf == InvalidBuffer)
RelationGetRelationName(rel), bknum, rbknum);
/* Recursively update the parent */
- _bt_insertonpg(rel, pbuf, stack->bts_parent,
+ _bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
new_item, stack->bts_offset + 1,
is_only);
}
}
+/*
+ * _bt_finish_split() -- Finish an incomplete split
+ *
+ * A crash or other failure can leave a split incomplete. The insertion
+ * routines won't allow to insert on a page that is incompletely split.
+ * Before inserting on such a page, call _bt_finish_split().
+ *
+ * On entry, 'lbuf' must be locked in write-mode. On exit, it is unlocked
+ * and unpinned.
+ */
+void
+_bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
+{
+ Page lpage = BufferGetPage(lbuf);
+ BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
+ Buffer rbuf;
+ Page rpage;
+ BTPageOpaque rpageop;
+ bool was_root;
+ bool was_only;
+
+ Assert(P_INCOMPLETE_SPLIT(lpageop));
+
+ /* Lock right sibling, the one missing the downlink */
+ rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
+ rpage = BufferGetPage(rbuf);
+ rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
+
+ /* Could this be a root split? */
+ if (!stack)
+ {
+ Buffer metabuf;
+ Page metapg;
+ BTMetaPageData *metad;
+
+ /* acquire lock on the metapage */
+ metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
+ metapg = BufferGetPage(metabuf);
+ metad = BTPageGetMeta(metapg);
+
+ was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
+
+ _bt_relbuf(rel, metabuf);
+ }
+ else
+ was_root = false;
+
+ /* Was this the only page on the level before split? */
+ was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
+
+ elog(DEBUG1, "finishing incomplete split of %u/%u",
+ BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
+
+ _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
+}
+
/*
* _bt_getstackbuf() -- Walk back up the tree one step, and find the item
* we last looked at in the parent.
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
+ {
+ _bt_finish_split(rel, buf, stack->bts_parent);
+ continue;
+ }
+
if (!P_IGNORE(opaque))
{
OffsetNumber offnum,
/*
* These loops will check every item on the page --- but in an
* order that's attuned to the probability of where it actually
- * is. Scan to the right first, then to the left.
+ * is. Scan to the right first, then to the left.
*/
for (offnum = start;
offnum <= maxoff;
rbkno;
BlockNumber rootblknum;
BTPageOpaque rootopaque;
+ BTPageOpaque lopaque;
ItemId itemid;
IndexTuple item;
- Size itemsz;
- IndexTuple new_item;
+ IndexTuple left_item;
+ Size left_item_sz;
+ IndexTuple right_item;
+ Size right_item_sz;
Buffer metabuf;
Page metapg;
BTMetaPageData *metad;
lbkno = BufferGetBlockNumber(lbuf);
rbkno = BufferGetBlockNumber(rbuf);
lpage = BufferGetPage(lbuf);
+ lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
/* get a new root page */
rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
metapg = BufferGetPage(metabuf);
metad = BTPageGetMeta(metapg);
+ /*
+ * Create downlink item for left page (old root). Since this will be the
+ * first item in a non-leaf page, it implicitly has minus-infinity key
+ * value, so we need not store any actual key in it.
+ */
+ left_item_sz = sizeof(IndexTupleData);
+ left_item = (IndexTuple) palloc(left_item_sz);
+ left_item->t_info = left_item_sz;
+ ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
+
+ /*
+ * Create downlink item for right page. The key for it is obtained from
+ * the "high key" position in the left page.
+ */
+ itemid = PageGetItemId(lpage, P_HIKEY);
+ right_item_sz = ItemIdGetLength(itemid);
+ item = (IndexTuple) PageGetItem(lpage, itemid);
+ right_item = CopyIndexTuple(item);
+ ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
+
/* NO EREPORT(ERROR) from here till newroot op is logged */
START_CRIT_SECTION();
metad->btm_fastroot = rootblknum;
metad->btm_fastlevel = rootopaque->btpo.level;
- /*
- * Create downlink item for left page (old root). Since this will be the
- * first item in a non-leaf page, it implicitly has minus-infinity key
- * value, so we need not store any actual key in it.
- */
- itemsz = sizeof(IndexTupleData);
- new_item = (IndexTuple) palloc(itemsz);
- new_item->t_info = itemsz;
- ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
-
/*
* Insert the left page pointer into the new root page. The root page is
* the rightmost page on its level so there is no "high key" in it; the
* Note: we *must* insert the two items in item-number order, for the
* benefit of _bt_restore_page().
*/
- if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY,
+ if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
false, false) == InvalidOffsetNumber)
elog(PANIC, "failed to add leftkey to new root page"
" while splitting block %u of index \"%s\"",
BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
- pfree(new_item);
-
- /*
- * Create downlink item for right page. The key for it is obtained from
- * the "high key" position in the left page.
- */
- itemid = PageGetItemId(lpage, P_HIKEY);
- itemsz = ItemIdGetLength(itemid);
- item = (IndexTuple) PageGetItem(lpage, itemid);
- new_item = CopyIndexTuple(item);
- ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
/*
* insert the right page pointer into the new root page.
*/
- if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY,
+ if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
false, false) == InvalidOffsetNumber)
elog(PANIC, "failed to add rightkey to new root page"
" while splitting block %u of index \"%s\"",
BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
- pfree(new_item);
+
+ /* Clear the incomplete-split flag in the left child */
+ Assert(P_INCOMPLETE_SPLIT(lopaque));
+ lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ MarkBufferDirty(lbuf);
MarkBufferDirty(rootbuf);
MarkBufferDirty(metabuf);
/* XLOG stuff */
- if (!rel->rd_istemp)
+ if (RelationNeedsWAL(rel))
{
xl_btree_newroot xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
+ xl_btree_metadata md;
- xlrec.node = rel->rd_node;
xlrec.rootblk = rootblknum;
xlrec.level = metad->btm_level;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeNewroot;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
+
+ XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
+ XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
+ XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+
+ md.root = rootblknum;
+ md.level = metad->btm_level;
+ md.fastroot = rootblknum;
+ md.fastlevel = metad->btm_level;
+
+ XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
/*
* Direct access to page is not good but faster - we should implement
* some new func in page API.
*/
- rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
- rdata[1].len = ((PageHeader) rootpage)->pd_special -
- ((PageHeader) rootpage)->pd_upper;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
+ XLogRegisterBufData(0,
+ (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
+ ((PageHeader) rootpage)->pd_special -
+ ((PageHeader) rootpage)->pd_upper);
- recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
+ PageSetLSN(lpage, recptr);
PageSetLSN(rootpage, recptr);
- PageSetTLI(rootpage, ThisTimeLineID);
PageSetLSN(metapg, recptr);
- PageSetTLI(metapg, ThisTimeLineID);
}
END_CRIT_SECTION();
- /* send out relcache inval for metapage change */
- if (!InRecovery)
- CacheInvalidateRelcache(rel);
-
/* done with metapage */
_bt_relbuf(rel, metabuf);
+ pfree(left_item);
+ pfree(right_item);
+
return rootbuf;
}
* we insert the tuples in order, so that the given itup_off does
* represent the final position of the tuple!
*/
-static void
-_bt_pgaddtup(Relation rel,
- Page page,
+static bool
+_bt_pgaddtup(Page page,
Size itemsize,
IndexTuple itup,
- OffsetNumber itup_off,
- const char *where)
+ OffsetNumber itup_off)
{
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
IndexTupleData trunctuple;
if (PageAddItem(page, (Item) itup, itemsize, itup_off,
false, false) == InvalidOffsetNumber)
- elog(PANIC, "failed to add item to the %s in index \"%s\"",
- where, RelationGetRelationName(rel));
+ return false;
+
+ return true;
}
/*
if (isNull || (scankey->sk_flags & SK_ISNULL))
return false;
- result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
- datum,
- scankey->sk_argument));
+ result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
+ scankey->sk_collation,
+ datum,
+ scankey->sk_argument));
if (result != 0)
return false;
* super-exclusive "cleanup" lock (see nbtree/README).
*/
static void
-_bt_vacuum_one_page(Relation rel, Buffer buffer)
+_bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
{
OffsetNumber deletable[MaxOffsetNumber];
int ndeletable = 0;
}
if (ndeletable > 0)
- _bt_delitems(rel, buffer, deletable, ndeletable);
+ _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
/*
* Note: if we didn't find any LP_DEAD items, then the page's