* nbtinsert.c
* Item insertion in Lehman and Yao btrees for Postgres.
*
- * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.145 2006/11/01 19:43:17 tgl Exp $
+ * src/backend/access/nbtree/nbtinsert.c
*
*-------------------------------------------------------------------------
*/
#include "access/heapam.h"
#include "access/nbtree.h"
#include "access/transam.h"
+#include "access/xloginsert.h"
#include "miscadmin.h"
-#include "utils/inval.h"
+#include "storage/lmgr.h"
+#include "storage/predicate.h"
+#include "utils/tqual.h"
typedef struct
int fillfactor; /* needed when splitting rightmost page */
bool is_leaf; /* T if splitting a leaf page */
bool is_rightmost; /* T if splitting a rightmost page */
+ OffsetNumber newitemoff; /* where the new item is to be inserted */
+ int leftspace; /* space available for items on left page */
+ int rightspace; /* space available for items on right page */
+ int olddataitemstotal; /* space taken by old items */
bool have_split; /* found a valid split? */
static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
- Relation heapRel, Buffer buf,
- ScanKey itup_scankey);
-static void _bt_insertonpg(Relation rel, Buffer buf,
+ Relation heapRel, Buffer buf, OffsetNumber offset,
+ ScanKey itup_scankey,
+ IndexUniqueCheck checkUnique, bool *is_unique,
+ uint32 *speculativeToken);
+static void _bt_findinsertloc(Relation rel,
+ Buffer *bufptr,
+ OffsetNumber *offsetptr,
+ int keysz,
+ ScanKey scankey,
+ IndexTuple newtup,
+ BTStack stack,
+ Relation heapRel);
+static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
BTStack stack,
- int keysz, ScanKey scankey,
IndexTuple itup,
- OffsetNumber afteritem,
+ OffsetNumber newitemoff,
bool split_only_page);
-static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
- OffsetNumber newitemoff, Size newitemsz,
+static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
+ OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
IndexTuple newitem, bool newitemonleft);
+static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
+ BTStack stack, bool is_root, bool is_only);
static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
OffsetNumber newitemoff,
Size newitemsz,
bool *newitemonleft);
-static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
- int leftfree, int rightfree,
- bool newitemonleft, Size firstrightitemsz);
-static void _bt_pgaddtup(Relation rel, Page page,
- Size itemsize, IndexTuple itup,
- OffsetNumber itup_off, const char *where);
+static void _bt_checksplitloc(FindSplitData *state,
+ OffsetNumber firstoldonright, bool newitemonleft,
+ int dataitemstoleft, Size firstoldonrightsz);
+static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
+ OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
-static void _bt_vacuum_one_page(Relation rel, Buffer buffer);
+static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
/*
* _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
*
- * This routine is called by the public interface routines, btbuild
- * and btinsert. By here, itup is filled in, including the TID.
+ * This routine is called by the public interface routine, btinsert.
+ * By here, itup is filled in, including the TID.
+ *
+ * If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
+ * will allow duplicates. Otherwise (UNIQUE_CHECK_YES or
+ * UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
+ * For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
+ * don't actually insert.
+ *
+ * The result value is only significant for UNIQUE_CHECK_PARTIAL:
+ * it must be TRUE if the entry is known unique, else FALSE.
+ * (In the current implementation we'll also return TRUE after a
+ * successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
+ * that's just a coding artifact.)
*/
-void
+bool
_bt_doinsert(Relation rel, IndexTuple itup,
- bool index_is_unique, Relation heapRel)
+ IndexUniqueCheck checkUnique, Relation heapRel)
{
+ bool is_unique = false;
int natts = rel->rd_rel->relnatts;
ScanKey itup_scankey;
BTStack stack;
Buffer buf;
+ OffsetNumber offset;
/* we need an insertion scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup);
/* find the first page containing this key */
stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+ offset = InvalidOffsetNumber;
+
/* trade in our read lock for a write lock */
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
LockBuffer(buf, BT_WRITE);
* If the page was split between the time that we surrendered our read
* lock and acquired our write lock, then this page may no longer be the
* right place for the key we want to insert. In this case, we need to
- * move right in the tree. See Lehman and Yao for an excruciatingly
+ * move right in the tree. See Lehman and Yao for an excruciatingly
* precise description.
*/
- buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
+ buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+ true, stack, BT_WRITE);
/*
* If we're not allowing duplicates, make sure the key isn't already in
*
* If we must wait for another xact, we release the lock while waiting,
* and then must start over completely.
+ *
+ * For a partial uniqueness check, we don't wait for the other xact. Just
+ * let the tuple in and return false for possibly non-unique, or true for
+ * definitely unique.
*/
- if (index_is_unique)
+ if (checkUnique != UNIQUE_CHECK_NO)
{
TransactionId xwait;
+ uint32 speculativeToken;
- xwait = _bt_check_unique(rel, itup, heapRel, buf, itup_scankey);
+ offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+ xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
+ checkUnique, &is_unique, &speculativeToken);
if (TransactionIdIsValid(xwait))
{
/* Have to wait for the other guy ... */
_bt_relbuf(rel, buf);
- XactLockTableWait(xwait);
+
+ /*
+ * If it's a speculative insertion, wait for it to finish (ie. to
+ * go ahead with the insertion, or kill the tuple). Otherwise
+ * wait for the transaction to finish as usual.
+ */
+ if (speculativeToken)
+ SpeculativeInsertionWait(xwait, speculativeToken);
+ else
+ XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
+
/* start over... */
_bt_freestack(stack);
goto top;
}
}
- /* do the insertion */
- _bt_insertonpg(rel, buf, stack, natts, itup_scankey, itup, 0, false);
+ if (checkUnique != UNIQUE_CHECK_EXISTING)
+ {
+ /*
+ * The only conflict predicate locking cares about for indexes is when
+ * an index tuple insert conflicts with an existing lock. Since the
+ * actual location of the insert is hard to predict because of the
+ * random search used to prevent O(N^2) performance when there are
+ * many duplicate entries, we can just use the "first valid" page.
+ */
+ CheckForSerializableConflictIn(rel, NULL, buf);
+ /* do the insertion */
+ _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+ stack, heapRel);
+ _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
+ }
+ else
+ {
+ /* just release the buffer */
+ _bt_relbuf(rel, buf);
+ }
/* be tidy */
_bt_freestack(stack);
_bt_freeskey(itup_scankey);
+
+ return is_unique;
}
/*
* _bt_check_unique() -- Check for violation of unique index constraint
*
+ * offset points to the first possible item that could conflict. It can
+ * also point to end-of-page, which means that the first tuple to check
+ * is the first tuple on the next page.
+ *
* Returns InvalidTransactionId if there is no conflict, else an xact ID
- * we must wait for to see if it commits a conflicting tuple. If an actual
- * conflict is detected, no return --- just ereport().
+ * we must wait for to see if it commits a conflicting tuple. If an actual
+ * conflict is detected, no return --- just ereport(). If an xact ID is
+ * returned, and the conflicting tuple still has a speculative insertion in
+ * progress, *speculativeToken is set to non-zero, and the caller can wait for
+ * the verdict on the insertion using SpeculativeInsertionWait().
+ *
+ * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
+ * InvalidTransactionId because we don't want to wait. In this case we
+ * set *is_unique to false if there is a potential conflict, and the
+ * core code must redo the uniqueness check later.
*/
static TransactionId
_bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
- Buffer buf, ScanKey itup_scankey)
+ Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
+ IndexUniqueCheck checkUnique, bool *is_unique,
+ uint32 *speculativeToken)
{
TupleDesc itupdesc = RelationGetDescr(rel);
int natts = rel->rd_rel->relnatts;
- OffsetNumber offset,
- maxoff;
+ SnapshotData SnapshotDirty;
+ OffsetNumber maxoff;
Page page;
BTPageOpaque opaque;
Buffer nbuf = InvalidBuffer;
+ bool found = false;
+
+ /* Assume unique until we find a duplicate */
+ *is_unique = true;
+
+ InitDirtySnapshot(SnapshotDirty);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
- /*
- * Find first item >= proposed new item. Note we could also get a pointer
- * to end-of-page here.
- */
- offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
-
/*
* Scan over all equal tuples, looking for live conflicts.
*/
for (;;)
{
- HeapTupleData htup;
- Buffer hbuffer;
ItemId curitemid;
IndexTuple curitup;
BlockNumber nblkno;
* we can. We only apply _bt_isequal() when we get to a non-killed
* item or the end of the page.
*/
- if (!ItemIdDeleted(curitemid))
+ if (!ItemIdIsDead(curitemid))
{
+ ItemPointerData htid;
+ bool all_dead;
+
/*
* _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
* how we handling NULLs - and so we must not use _bt_compare
/* okay, we gotta fetch the heap tuple ... */
curitup = (IndexTuple) PageGetItem(page, curitemid);
- htup.t_self = curitup->t_tid;
- if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
- true, NULL))
+ htid = curitup->t_tid;
+
+ /*
+ * If we are doing a recheck, we expect to find the tuple we
+ * are rechecking. It's not a duplicate, but we have to keep
+ * scanning.
+ */
+ if (checkUnique == UNIQUE_CHECK_EXISTING &&
+ ItemPointerCompare(&htid, &itup->t_tid) == 0)
{
- /* it is a duplicate */
- TransactionId xwait =
- (TransactionIdIsValid(SnapshotDirty->xmin)) ?
- SnapshotDirty->xmin : SnapshotDirty->xmax;
+ found = true;
+ }
- ReleaseBuffer(hbuffer);
+ /*
+ * We check the whole HOT-chain to see if there is any tuple
+ * that satisfies SnapshotDirty. This is necessary because we
+ * have just a single index entry for the entire chain.
+ */
+ else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
+ &all_dead))
+ {
+ TransactionId xwait;
+
+ /*
+ * It is a duplicate. If we are only doing a partial
+ * check, then don't bother checking if the tuple is being
+ * updated in another transaction. Just return the fact
+ * that it is a potential conflict and leave the full
+ * check till later.
+ */
+ if (checkUnique == UNIQUE_CHECK_PARTIAL)
+ {
+ if (nbuf != InvalidBuffer)
+ _bt_relbuf(rel, nbuf);
+ *is_unique = false;
+ return InvalidTransactionId;
+ }
/*
* If this tuple is being updated by other transaction
* then we have to wait for its commit/abort.
*/
+ xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
+ SnapshotDirty.xmin : SnapshotDirty.xmax;
+
if (TransactionIdIsValid(xwait))
{
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
/* Tell _bt_doinsert to wait... */
+ *speculativeToken = SnapshotDirty.speculativeToken;
return xwait;
}
* is itself now committed dead --- if so, don't complain.
* This is a waste of time in normal scenarios but we must
* do it to support CREATE INDEX CONCURRENTLY.
+ *
+ * We must follow HOT-chains here because during
+ * concurrent index build, we insert the root TID though
+ * the actual tuple may be somewhere in the HOT-chain.
+ * While following the chain we might not stop at the
+ * exact tuple which triggered the insert, but that's OK
+ * because if we find a live tuple anywhere in this chain,
+ * we have a unique key conflict. The other live tuple is
+ * not part of this chain because it had a different index
+ * entry.
*/
- htup.t_self = itup->t_tid;
- if (heap_fetch(heapRel, SnapshotSelf, &htup, &hbuffer,
- false, NULL))
+ htid = itup->t_tid;
+ if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
{
/* Normal case --- it's still live */
- ReleaseBuffer(hbuffer);
}
- else if (htup.t_data != NULL)
+ else
{
/*
* It's been deleted, so no error, and no need to
*/
break;
}
- else
+
+ /*
+ * This is a definite conflict. Break the tuple down into
+ * datums and report the error. But first, make sure we
+ * release the buffer locks we're holding ---
+ * BuildIndexValueDescription could make catalog accesses,
+ * which in the worst case might touch this same index and
+ * cause deadlocks.
+ */
+ if (nbuf != InvalidBuffer)
+ _bt_relbuf(rel, nbuf);
+ _bt_relbuf(rel, buf);
+
{
- /* couldn't find the tuple?? */
- elog(ERROR, "failed to fetch tuple being inserted");
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ char *key_desc;
+
+ index_deform_tuple(itup, RelationGetDescr(rel),
+ values, isnull);
+
+ key_desc = BuildIndexValueDescription(rel, values,
+ isnull);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNIQUE_VIOLATION),
+ errmsg("duplicate key value violates unique constraint \"%s\"",
+ RelationGetRelationName(rel)),
+ key_desc ? errdetail("Key %s already exists.",
+ key_desc) : 0,
+ errtableconstraint(heapRel,
+ RelationGetRelationName(rel))));
}
-
- ereport(ERROR,
- (errcode(ERRCODE_UNIQUE_VIOLATION),
- errmsg("duplicate key violates unique constraint \"%s\"",
- RelationGetRelationName(rel))));
}
- else if (htup.t_data != NULL)
+ else if (all_dead)
{
/*
- * Hmm, if we can't see the tuple, maybe it can be marked
- * killed. This logic should match index_getnext and
- * btgettuple.
+ * The conflicting tuple (or whole HOT chain) is dead to
+ * everyone, so we may as well mark the index entry
+ * killed.
*/
- LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
- if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin,
- hbuffer) == HEAPTUPLE_DEAD)
- {
- curitemid->lp_flags |= LP_DELETE;
- opaque->btpo_flags |= BTP_HAS_GARBAGE;
- /* be sure to mark the proper buffer dirty... */
- if (nbuf != InvalidBuffer)
- SetBufferCommitInfoNeedsSave(nbuf);
- else
- SetBufferCommitInfoNeedsSave(buf);
- }
- LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
+ ItemIdMarkDead(curitemid);
+ opaque->btpo_flags |= BTP_HAS_GARBAGE;
+
+ /*
+ * Mark buffer with a dirty hint, since state is not
+ * crucial. Be sure to mark the proper buffer dirty.
+ */
+ if (nbuf != InvalidBuffer)
+ MarkBufferDirtyHint(nbuf, true);
+ else
+ MarkBufferDirtyHint(buf, true);
}
- ReleaseBuffer(hbuffer);
}
}
if (!P_IGNORE(opaque))
break;
if (P_RIGHTMOST(opaque))
- elog(ERROR, "fell off the end of \"%s\"",
+ elog(ERROR, "fell off the end of index \"%s\"",
RelationGetRelationName(rel));
}
maxoff = PageGetMaxOffsetNumber(page);
}
}
+ /*
+ * If we are doing a recheck then we should have found the tuple we are
+ * checking. Otherwise there's something very wrong --- probably, the
+ * index is on a non-immutable expression.
+ */
+ if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("failed to re-find tuple within index \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("This may be because of a non-immutable index expression."),
+ errtableconstraint(heapRel,
+ RelationGetRelationName(rel))));
+
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
return InvalidTransactionId;
}
-/*----------
- * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
- *
- * This recursive procedure does the following things:
- *
- * + finds the right place to insert the tuple.
- * + if necessary, splits the target page (making sure that the
- * split is equitable as far as post-insert free space goes).
- * + inserts the tuple.
- * + if the page was split, pops the parent stack, and finds the
- * right place to insert the new child pointer (by walking
- * right using information stored in the parent stack).
- * + invokes itself with the appropriate tuple for the right
- * child page on the parent.
- * + updates the metapage if a true root or fast root is split.
- *
- * On entry, we must have the right buffer in which to do the
- * insertion, and the buffer must be pinned and write-locked. On return,
- * we will have dropped both the pin and the lock on the buffer.
- *
- * If 'afteritem' is >0 then the new tuple must be inserted after the
- * existing item of that number, noplace else. If 'afteritem' is 0
- * then the procedure finds the exact spot to insert it by searching.
- * (keysz and scankey parameters are used ONLY if afteritem == 0.
- * The scankey must be an insertion-type scankey.)
+
+/*
+ * _bt_findinsertloc() -- Finds an insert location for a tuple
*
- * NOTE: if the new key is equal to one or more existing keys, we can
+ * If the new key is equal to one or more existing keys, we can
* legitimately place it anywhere in the series of equal keys --- in fact,
* if the new key is equal to the page's "high key" we can place it on
- * the next page. If it is equal to the high key, and there's not room
+ * the next page. If it is equal to the high key, and there's not room
* to insert the new tuple on the current page without splitting, then
* we can move right hoping to find more free space and avoid a split.
* (We should not move right indefinitely, however, since that leads to
* Once we have chosen the page to put the key on, we'll insert it before
* any existing equal keys because of the way _bt_binsrch() works.
*
- * The locking interactions in this code are critical. You should
- * grok Lehman and Yao's paper before making any changes. In addition,
- * you need to understand how we disambiguate duplicate keys in this
- * implementation, in order to be able to find our location using
- * L&Y "move right" operations. Since we may insert duplicate user
- * keys, and since these dups may propagate up the tree, we use the
- * 'afteritem' parameter to position ourselves correctly for the
- * insertion on internal pages.
- *----------
+ * If there's not enough room in the space, we try to make room by
+ * removing any LP_DEAD tuples.
+ *
+ * On entry, *bufptr and *offsetptr point to the first legal position
+ * where the new tuple could be inserted. The caller should hold an
+ * exclusive lock on *bufptr. *offsetptr can also be set to
+ * InvalidOffsetNumber, in which case the function will search for the
+ * right location within the page if needed. On exit, they point to the
+ * chosen insert location. If _bt_findinsertloc decides to move right,
+ * the lock and pin on the original page will be released and the new
+ * page returned to the caller is exclusively locked instead.
+ *
+ * newtup is the new tuple we're inserting, and scankey is an insertion
+ * type scan key for it.
*/
static void
-_bt_insertonpg(Relation rel,
- Buffer buf,
- BTStack stack,
- int keysz,
- ScanKey scankey,
- IndexTuple itup,
- OffsetNumber afteritem,
- bool split_only_page)
+_bt_findinsertloc(Relation rel,
+ Buffer *bufptr,
+ OffsetNumber *offsetptr,
+ int keysz,
+ ScanKey scankey,
+ IndexTuple newtup,
+ BTStack stack,
+ Relation heapRel)
{
- Page page;
+ Buffer buf = *bufptr;
+ Page page = BufferGetPage(buf);
+ Size itemsz;
BTPageOpaque lpageop;
+ bool movedright,
+ vacuumed;
OffsetNumber newitemoff;
- OffsetNumber firstright = InvalidOffsetNumber;
- Size itemsz;
+ OffsetNumber firstlegaloff = *offsetptr;
- page = BufferGetPage(buf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
- itemsz = IndexTupleDSize(*itup);
+ itemsz = IndexTupleDSize(*newtup);
itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
* need to be consistent */
* able to fit three items on every page, so restrict any one item to 1/3
* the per-page available space. Note that at this point, itemsz doesn't
* include the ItemId.
+ *
+ * NOTE: if you change this, see also the similar code in _bt_buildadd().
*/
if (itemsz > BTMaxItemSize(page))
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("index row size %lu exceeds btree maximum, %lu",
- (unsigned long) itemsz,
- (unsigned long) BTMaxItemSize(page)),
+ errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
+ itemsz, BTMaxItemSize(page),
+ RelationGetRelationName(rel)),
errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
"Consider a function index of an MD5 hash of the value, "
- "or use full text indexing.")));
-
- /*
- * Determine exactly where new item will go.
+ "or use full text indexing."),
+ errtableconstraint(heapRel,
+ RelationGetRelationName(rel))));
+
+ /*----------
+ * If we will need to split the page to put the item on this page,
+ * check whether we can put the tuple somewhere to the right,
+ * instead. Keep scanning right until we
+ * (a) find a page with enough free space,
+ * (b) reach the last page where the tuple can legally go, or
+ * (c) get tired of searching.
+ * (c) is not flippant; it is important because if there are many
+ * pages' worth of equal keys, it's better to split one of the early
+ * pages than to scan all the way to the end of the run of equal keys
+ * on every insert. We implement "get tired" as a random choice,
+ * since stopping after scanning a fixed number of pages wouldn't work
+ * well (we'd never reach the right-hand side of previously split
+ * pages). Currently the probability of moving right is set at 0.99,
+ * which may seem too high to change the behavior much, but it does an
+ * excellent job of preventing O(N^2) behavior with many equal keys.
+ *----------
*/
- if (afteritem > 0)
- newitemoff = afteritem + 1;
- else
+ movedright = false;
+ vacuumed = false;
+ while (PageGetFreeSpace(page) < itemsz)
{
- /*----------
- * If we will need to split the page to put the item here,
- * check whether we can put the tuple somewhere to the right,
- * instead. Keep scanning right until we
- * (a) find a page with enough free space,
- * (b) reach the last page where the tuple can legally go, or
- * (c) get tired of searching.
- * (c) is not flippant; it is important because if there are many
- * pages' worth of equal keys, it's better to split one of the early
- * pages than to scan all the way to the end of the run of equal keys
- * on every insert. We implement "get tired" as a random choice,
- * since stopping after scanning a fixed number of pages wouldn't work
- * well (we'd never reach the right-hand side of previously split
- * pages). Currently the probability of moving right is set at 0.99,
- * which may seem too high to change the behavior much, but it does an
- * excellent job of preventing O(N^2) behavior with many equal keys.
- *----------
+ Buffer rbuf;
+ BlockNumber rblkno;
+
+ /*
+ * before considering moving right, see if we can obtain enough space
+ * by erasing LP_DEAD items
*/
- bool movedright = false;
+ if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
+ {
+ _bt_vacuum_one_page(rel, buf, heapRel);
- while (PageGetFreeSpace(page) < itemsz)
+ /*
+ * remember that we vacuumed this page, because that makes the
+ * hint supplied by the caller invalid
+ */
+ vacuumed = true;
+
+ if (PageGetFreeSpace(page) >= itemsz)
+ break; /* OK, now we have enough space */
+ }
+
+ /*
+ * nope, so check conditions (b) and (c) enumerated above
+ */
+ if (P_RIGHTMOST(lpageop) ||
+ _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
+ random() <= (MAX_RANDOM_VALUE / 100))
+ break;
+
+ /*
+ * step right to next non-dead page
+ *
+ * must write-lock that page before releasing write lock on current
+ * page; else someone else's _bt_check_unique scan could fail to see
+ * our insertion. write locks on intermediate dead pages won't do
+ * because we don't know when they will get de-linked from the tree.
+ */
+ rbuf = InvalidBuffer;
+
+ rblkno = lpageop->btpo_next;
+ for (;;)
{
- Buffer rbuf;
+ rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
+ page = BufferGetPage(rbuf);
+ lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
/*
- * before considering moving right, see if we can obtain enough
- * space by erasing LP_DELETE items
+ * If this page was incompletely split, finish the split now. We
+ * do this while holding a lock on the left sibling, which is not
+ * good because finishing the split could be a fairly lengthy
+ * operation. But this should happen very seldom.
*/
- if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
+ if (P_INCOMPLETE_SPLIT(lpageop))
{
- _bt_vacuum_one_page(rel, buf);
- if (PageGetFreeSpace(page) >= itemsz)
- break; /* OK, now we have enough space */
+ _bt_finish_split(rel, rbuf, stack);
+ rbuf = InvalidBuffer;
+ continue;
}
- /*
- * nope, so check conditions (b) and (c) enumerated above
- */
- if (P_RIGHTMOST(lpageop) ||
- _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
- random() <= (MAX_RANDOM_VALUE / 100))
+ if (!P_IGNORE(lpageop))
break;
+ if (P_RIGHTMOST(lpageop))
+ elog(ERROR, "fell off the end of index \"%s\"",
+ RelationGetRelationName(rel));
- /*
- * step right to next non-dead page
- *
- * must write-lock that page before releasing write lock on
- * current page; else someone else's _bt_check_unique scan could
- * fail to see our insertion. write locks on intermediate dead
- * pages won't do because we don't know when they will get
- * de-linked from the tree.
- */
- rbuf = InvalidBuffer;
+ rblkno = lpageop->btpo_next;
+ }
+ _bt_relbuf(rel, buf);
+ buf = rbuf;
+ movedright = true;
+ vacuumed = false;
+ }
- for (;;)
- {
- BlockNumber rblkno = lpageop->btpo_next;
+ /*
+ * Now we are on the right page, so find the insert position. If we moved
+ * right at all, we know we should insert at the start of the page. If we
+ * didn't move right, we can use the firstlegaloff hint if the caller
+ * supplied one, unless we vacuumed the page which might have moved tuples
+ * around making the hint invalid. If we didn't move right or can't use
+ * the hint, find the position by searching.
+ */
+ if (movedright)
+ newitemoff = P_FIRSTDATAKEY(lpageop);
+ else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
+ newitemoff = firstlegaloff;
+ else
+ newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
- rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
- page = BufferGetPage(rbuf);
- lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
- if (!P_IGNORE(lpageop))
- break;
- if (P_RIGHTMOST(lpageop))
- elog(ERROR, "fell off the end of \"%s\"",
- RelationGetRelationName(rel));
- }
- _bt_relbuf(rel, buf);
- buf = rbuf;
- movedright = true;
- }
+ *bufptr = buf;
+ *offsetptr = newitemoff;
+}
- /*
- * Now we are on the right page, so find the insert position. If we
- * moved right at all, we know we should insert at the start of the
- * page, else must find the position by searching.
- */
- if (movedright)
- newitemoff = P_FIRSTDATAKEY(lpageop);
- else
- newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
- }
+/*----------
+ * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
+ *
+ * This recursive procedure does the following things:
+ *
+ * + if necessary, splits the target page (making sure that the
+ * split is equitable as far as post-insert free space goes).
+ * + inserts the tuple.
+ * + if the page was split, pops the parent stack, and finds the
+ * right place to insert the new child pointer (by walking
+ * right using information stored in the parent stack).
+ * + invokes itself with the appropriate tuple for the right
+ * child page on the parent.
+ * + updates the metapage if a true root or fast root is split.
+ *
+ * On entry, we must have the correct buffer in which to do the
+ * insertion, and the buffer must be pinned and write-locked. On return,
+ * we will have dropped both the pin and the lock on the buffer.
+ *
+ * When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
+ * page we're inserting the downlink for. This function will clear the
+ * INCOMPLETE_SPLIT flag on it, and release the buffer.
+ *
+ * The locking interactions in this code are critical. You should
+ * grok Lehman and Yao's paper before making any changes. In addition,
+ * you need to understand how we disambiguate duplicate keys in this
+ * implementation, in order to be able to find our location using
+ * L&Y "move right" operations. Since we may insert duplicate user
+ * keys, and since these dups may propagate up the tree, we use the
+ * 'afteritem' parameter to position ourselves correctly for the
+ * insertion on internal pages.
+ *----------
+ */
+static void
+_bt_insertonpg(Relation rel,
+ Buffer buf,
+ Buffer cbuf,
+ BTStack stack,
+ IndexTuple itup,
+ OffsetNumber newitemoff,
+ bool split_only_page)
+{
+ Page page;
+ BTPageOpaque lpageop;
+ OffsetNumber firstright = InvalidOffsetNumber;
+ Size itemsz;
+
+ page = BufferGetPage(buf);
+ lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ /* child buffer must be given iff inserting on an internal page */
+ Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
+
+ /* The caller should've finished any incomplete splits already. */
+ if (P_INCOMPLETE_SPLIT(lpageop))
+ elog(ERROR, "cannot insert to incompletely split page %u",
+ BufferGetBlockNumber(buf));
+
+ itemsz = IndexTupleDSize(*itup);
+ itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
+ * need to be consistent */
/*
* Do we need to split the page to fit the item on it?
&newitemonleft);
/* split the buffer into left and right halves */
- rbuf = _bt_split(rel, buf, firstright,
+ rbuf = _bt_split(rel, buf, cbuf, firstright,
newitemoff, itemsz, itup, newitemonleft);
+ PredicateLockPageSplit(rel,
+ BufferGetBlockNumber(buf),
+ BufferGetBlockNumber(rbuf));
/*----------
* By here,
/* Do the update. No ereport(ERROR) until changes are logged */
START_CRIT_SECTION();
- _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
+ if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
+ elog(PANIC, "failed to add new item to block %u in index \"%s\"",
+ itup_blkno, RelationGetRelationName(rel));
MarkBufferDirty(buf);
MarkBufferDirty(metabuf);
}
+ /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
+ if (BufferIsValid(cbuf))
+ {
+ Page cpage = BufferGetPage(cbuf);
+ BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+
+ Assert(P_INCOMPLETE_SPLIT(cpageop));
+ cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ MarkBufferDirty(cbuf);
+ }
+
/* XLOG stuff */
- if (!rel->rd_istemp)
+ if (RelationNeedsWAL(rel))
{
xl_btree_insert xlrec;
- BlockNumber xldownlink;
xl_btree_metadata xlmeta;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[4];
- XLogRecData *nextrdata;
IndexTupleData trunctuple;
- xlrec.target.node = rel->rd_node;
- ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
+ xlrec.offnum = itup_off;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeInsert;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = nextrdata = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
if (P_ISLEAF(lpageop))
xlinfo = XLOG_BTREE_INSERT_LEAF;
else
{
- xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
- Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
-
- nextrdata->data = (char *) &xldownlink;
- nextrdata->len = sizeof(BlockNumber);
- nextrdata->buffer = InvalidBuffer;
- nextrdata->next = nextrdata + 1;
- nextrdata++;
+ /*
+ * Register the left child whose INCOMPLETE_SPLIT flag was
+ * cleared.
+ */
+ XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
xlinfo = XLOG_BTREE_INSERT_UPPER;
}
xlmeta.fastroot = metad->btm_fastroot;
xlmeta.fastlevel = metad->btm_fastlevel;
- nextrdata->data = (char *) &xlmeta;
- nextrdata->len = sizeof(xl_btree_metadata);
- nextrdata->buffer = InvalidBuffer;
- nextrdata->next = nextrdata + 1;
- nextrdata++;
+ XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+ XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
xlinfo = XLOG_BTREE_INSERT_META;
}
/* Read comments in _bt_pgaddtup */
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
{
trunctuple = *itup;
trunctuple.t_info = sizeof(IndexTupleData);
- nextrdata->data = (char *) &trunctuple;
- nextrdata->len = sizeof(IndexTupleData);
+ XLogRegisterBufData(0, (char *) &trunctuple,
+ sizeof(IndexTupleData));
}
else
- {
- nextrdata->data = (char *) itup;
- nextrdata->len = IndexTupleDSize(*itup);
- }
- nextrdata->buffer = buf;
- nextrdata->buffer_std = true;
- nextrdata->next = NULL;
+ XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
- recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo);
if (BufferIsValid(metabuf))
{
PageSetLSN(metapg, recptr);
- PageSetTLI(metapg, ThisTimeLineID);
+ }
+ if (BufferIsValid(cbuf))
+ {
+ PageSetLSN(BufferGetPage(cbuf), recptr);
}
PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
}
END_CRIT_SECTION();
- /* release buffers; send out relcache inval if metapage changed */
+ /* release buffers */
if (BufferIsValid(metabuf))
- {
- CacheInvalidateRelcache(rel);
_bt_relbuf(rel, metabuf);
- }
-
+ if (BufferIsValid(cbuf))
+ _bt_relbuf(rel, cbuf);
_bt_relbuf(rel, buf);
}
}
* new right page. newitemoff etc. tell us about the new item that
* must be inserted along with the data from the old page.
*
+ * When splitting a non-leaf page, 'cbuf' is the left-sibling of the
+ * page we're inserting the downlink for. This function will clear the
+ * INCOMPLETE_SPLIT flag on it, and release the buffer.
+ *
* Returns the new right sibling of buf, pinned and write-locked.
* The pin and lock on buf are maintained.
*/
static Buffer
-_bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
+_bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
bool newitemonleft)
{
Page origpage;
Page leftpage,
rightpage;
+ BlockNumber origpagenumber,
+ rightpagenumber;
BTPageOpaque ropaque,
lopaque,
oopaque;
Buffer sbuf = InvalidBuffer;
Page spage = NULL;
BTPageOpaque sopaque = NULL;
- OffsetNumber itup_off = 0;
- BlockNumber itup_blkno = 0;
Size itemsz;
ItemId itemid;
IndexTuple item;
rightoff;
OffsetNumber maxoff;
OffsetNumber i;
+ bool isroot;
+ bool isleaf;
+ /* Acquire a new page to split into */
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
+
+ /*
+ * origpage is the original page to be split. leftpage is a temporary
+ * buffer that receives the left-sibling data, which will be copied back
+ * into origpage on success. rightpage is the new page that receives the
+ * right-sibling data. If we fail before reaching the critical section,
+ * origpage hasn't been modified and leftpage is only workspace. In
+ * principle we shouldn't need to worry about rightpage either, because it
+ * hasn't been linked into the btree page structure; but to avoid leaving
+ * possibly-confusing junk behind, we are careful to rewrite rightpage as
+ * zeroes before throwing any error.
+ */
origpage = BufferGetPage(buf);
- leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
+ leftpage = PageGetTempPage(origpage);
rightpage = BufferGetPage(rbuf);
+ origpagenumber = BufferGetBlockNumber(buf);
+ rightpagenumber = BufferGetBlockNumber(rbuf);
+
_bt_pageinit(leftpage, BufferGetPageSize(buf));
/* rightpage was already initialized by _bt_getbuf */
+ /*
+ * Copy the original page's LSN into leftpage, which will become the
+ * updated version of the page. We need this because XLogInsert will
+ * examine the LSN and possibly dump it in a page image.
+ */
+ PageSetLSN(leftpage, PageGetLSN(origpage));
+
/* init btree private data */
oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
+ isroot = P_ISROOT(oopaque);
+ isleaf = P_ISLEAF(oopaque);
+
/* if we're splitting this page, it won't be the root when we're done */
/* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
lopaque->btpo_flags = oopaque->btpo_flags;
lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
ropaque->btpo_flags = lopaque->btpo_flags;
+ /* set flag in left page indicating that the right page has no downlink */
+ lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
lopaque->btpo_prev = oopaque->btpo_prev;
- lopaque->btpo_next = BufferGetBlockNumber(rbuf);
- ropaque->btpo_prev = BufferGetBlockNumber(buf);
+ lopaque->btpo_next = rightpagenumber;
+ ropaque->btpo_prev = origpagenumber;
ropaque->btpo_next = oopaque->btpo_next;
lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
/* Since we already have write-lock on both pages, ok to read cycleid */
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(origpage, itemid);
if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
- LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add hikey to the right sibling");
+ false, false) == InvalidOffsetNumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
item = (IndexTuple) PageGetItem(origpage, itemid);
}
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
- LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add hikey to the left sibling");
+ false, false) == InvalidOffsetNumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
/*
- * Now transfer all the data items to the appropriate page
+ * Now transfer all the data items to the appropriate page.
+ *
+ * Note: we *must* insert at least the right page's items in item-number
+ * order, for the benefit of _bt_restore_page().
*/
maxoff = PageGetMaxOffsetNumber(origpage);
{
if (newitemonleft)
{
- _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
- "left sibling");
- itup_off = leftoff;
- itup_blkno = BufferGetBlockNumber(buf);
+ if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
- "right sibling");
- itup_off = rightoff;
- itup_blkno = BufferGetBlockNumber(rbuf);
+ if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
/* decide which page to put it on */
if (i < firstright)
{
- _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
- "left sibling");
+ if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add old item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add old item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
/* cope with possibility that newitem goes at the end */
if (i <= newitemoff)
{
- if (newitemonleft)
- {
- _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
- "left sibling");
- itup_off = leftoff;
- itup_blkno = BufferGetBlockNumber(buf);
- leftoff = OffsetNumberNext(leftoff);
- }
- else
+ /*
+ * Can't have newitemonleft here; that would imply we were told to put
+ * *everything* on the left page, which cannot fit (if it could, we'd
+ * not be splitting the page).
+ */
+ Assert(!newitemonleft);
+ if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
{
- _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
- "right sibling");
- itup_off = rightoff;
- itup_blkno = BufferGetBlockNumber(rbuf);
- rightoff = OffsetNumberNext(rightoff);
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
}
+ rightoff = OffsetNumberNext(rightoff);
}
/*
* neighbors.
*/
- if (!P_RIGHTMOST(ropaque))
+ if (!P_RIGHTMOST(oopaque))
{
- sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
+ sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
spage = BufferGetPage(sbuf);
sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
- if (sopaque->btpo_prev != ropaque->btpo_prev)
- elog(PANIC, "right sibling's left-link doesn't match");
+ if (sopaque->btpo_prev != origpagenumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "right sibling's left-link doesn't match: "
+ "block %u links to %u instead of expected %u in index \"%s\"",
+ oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
+ RelationGetRelationName(rel));
+ }
/*
* Check to see if we can set the SPLIT_END flag in the right-hand
* page. If you're confused, imagine that page A splits to A B and
* then again, yielding A C B, while vacuum is in progress. Tuples
* originally in A could now be in either B or C, hence vacuum must
- * examine both pages. But if D, our right sibling, has a different
+ * examine both pages. But if D, our right sibling, has a different
* cycleid then it could not contain any tuples that were in A when
* the vacuum started.
*/
/*
* Right sibling is locked, new siblings are prepared, but original page
- * is not updated yet. Log changes before continuing.
+ * is not updated yet.
*
* NO EREPORT(ERROR) till right sibling is updated. We can get away with
* not starting the critical section till here because we haven't been
- * scribbling on the original page yet, and we don't care about the new
- * sibling until it's linked into the btree.
+ * scribbling on the original page yet; see comments above.
*/
START_CRIT_SECTION();
+ /*
+ * By here, the original data page has been split into two new halves, and
+ * these are correct. The algorithm requires that the left page never
+ * move during a split, so we copy the new left page back on top of the
+ * original. Note that this is not a waste of time, since we also require
+ * (in the page management code) that the center of a page always be
+ * clean, and the most efficient way to guarantee this is just to compact
+ * the data by reinserting it into a new left page. (XXX the latter
+ * comment is probably obsolete; but in any case it's good to not scribble
+ * on the original page until we enter the critical section.)
+ *
+ * We need to do this before writing the WAL record, so that XLogInsert
+ * can WAL log an image of the page if necessary.
+ */
+ PageRestoreTempPage(leftpage, origpage);
+ /* leftpage, lopaque must not be used below here */
+
MarkBufferDirty(buf);
MarkBufferDirty(rbuf);
if (!P_RIGHTMOST(ropaque))
{
- sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
+ sopaque->btpo_prev = rightpagenumber;
MarkBufferDirty(sbuf);
}
+ /*
+ * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
+ * a split.
+ */
+ if (!isleaf)
+ {
+ Page cpage = BufferGetPage(cbuf);
+ BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+
+ cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ MarkBufferDirty(cbuf);
+ }
+
/* XLOG stuff */
- if (!rel->rd_istemp)
+ if (RelationNeedsWAL(rel))
{
xl_btree_split xlrec;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[4];
- xlrec.target.node = rel->rd_node;
- ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
+ xlrec.level = ropaque->btpo.level;
+ xlrec.firstright = firstright;
+ xlrec.newitemoff = newitemoff;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
+
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
+ /* Log the right sibling, because we've changed its prev-pointer. */
+ if (!P_RIGHTMOST(ropaque))
+ XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
+ if (BufferIsValid(cbuf))
+ XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
+
+ /*
+ * Log the new item, if it was inserted on the left page. (If it was
+ * put on the right page, we don't need to explicitly WAL log it
+ * because it's included with all the other items on the right page.)
+ * Show the new item as belonging to the left page buffer, so that it
+ * is not stored if XLogInsert decides it needs a full-page image of
+ * the left page. We store the offset anyway, though, to support
+ * archive compression of these records.
+ */
if (newitemonleft)
- xlrec.otherblk = BufferGetBlockNumber(rbuf);
- else
- xlrec.otherblk = BufferGetBlockNumber(buf);
- xlrec.leftblk = lopaque->btpo_prev;
- xlrec.rightblk = ropaque->btpo_next;
- xlrec.level = lopaque->btpo.level;
+ XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
+
+ /* Log left page */
+ if (!isleaf)
+ {
+ /*
+ * We must also log the left page's high key, because the right
+ * page's leftmost key is suppressed on non-leaf levels. Show it
+ * as belonging to the left page buffer, so that it is not stored
+ * if XLogInsert decides it needs a full-page image of the left
+ * page.
+ */
+ itemid = PageGetItemId(origpage, P_HIKEY);
+ item = (IndexTuple) PageGetItem(origpage, itemid);
+ XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
+ }
/*
+ * Log the contents of the right page in the format understood by
+ * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
+ * because we're going to recreate the whole page anyway, so it should
+ * never be stored by XLogInsert.
+ *
* Direct access to page is not good but faster - we should implement
* some new func in page API. Note we only store the tuples
- * themselves, knowing that the item pointers are in the same order
- * and can be reconstructed by scanning the tuples. See comments for
+ * themselves, knowing that they were inserted in item-number order
+ * and so the item pointers can be reconstructed. See comments for
* _bt_restore_page().
*/
- xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
- ((PageHeader) leftpage)->pd_upper;
-
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeSplit;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
- rdata[1].len = xlrec.leftlen;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &(rdata[2]);
-
- rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
- rdata[2].len = ((PageHeader) rightpage)->pd_special -
- ((PageHeader) rightpage)->pd_upper;
- rdata[2].buffer = InvalidBuffer;
- rdata[2].next = NULL;
+ XLogRegisterBufData(1,
+ (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
+ ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
- if (!P_RIGHTMOST(ropaque))
- {
- rdata[2].next = &(rdata[3]);
- rdata[3].data = NULL;
- rdata[3].len = 0;
- rdata[3].buffer = sbuf;
- rdata[3].buffer_std = true;
- rdata[3].next = NULL;
- }
-
- if (P_ISROOT(oopaque))
+ if (isroot)
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
else
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
- recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo);
- PageSetLSN(leftpage, recptr);
- PageSetTLI(leftpage, ThisTimeLineID);
+ PageSetLSN(origpage, recptr);
PageSetLSN(rightpage, recptr);
- PageSetTLI(rightpage, ThisTimeLineID);
if (!P_RIGHTMOST(ropaque))
{
PageSetLSN(spage, recptr);
- PageSetTLI(spage, ThisTimeLineID);
+ }
+ if (!isleaf)
+ {
+ PageSetLSN(BufferGetPage(cbuf), recptr);
}
}
- /*
- * By here, the original data page has been split into two new halves, and
- * these are correct. The algorithm requires that the left page never
- * move during a split, so we copy the new left page back on top of the
- * original. Note that this is not a waste of time, since we also require
- * (in the page management code) that the center of a page always be
- * clean, and the most efficient way to guarantee this is just to compact
- * the data by reinserting it into a new left page. (XXX the latter
- * comment is probably obsolete.)
- *
- * It's a bit weird that we don't fill in the left page till after writing
- * the XLOG entry, but not really worth changing. Note that we use the
- * origpage data (specifically its BTP_ROOT bit) while preparing the XLOG
- * entry, so simply reshuffling the code won't do.
- */
-
- PageRestoreTempPage(leftpage, origpage);
-
END_CRIT_SECTION();
/* release the old right sibling */
if (!P_RIGHTMOST(ropaque))
_bt_relbuf(rel, sbuf);
+ /* release the child */
+ if (!isleaf)
+ _bt_relbuf(rel, cbuf);
+
/* split's done */
return rbuf;
}
*
* We return the index of the first existing tuple that should go on the
* righthand page, plus a boolean indicating whether the new tuple goes on
- * the left or right page. The bool is necessary to disambiguate the case
+ * the left or right page. The bool is necessary to disambiguate the case
* where firstright == newitemoff.
*/
static OffsetNumber
int leftspace,
rightspace,
goodenough,
- dataitemtotal,
- dataitemstoleft;
+ olddataitemstotal,
+ olddataitemstoleft;
+ bool goodenoughfound;
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
newitemsz += sizeof(ItemIdData);
+
+ /* Total free space available on a btree page, after fixed overhead */
+ leftspace = rightspace =
+ PageGetPageSize(page) - SizeOfPageHeaderData -
+ MAXALIGN(sizeof(BTPageOpaqueData));
+
+ /* The right page will have the same high key as the old page */
+ if (!P_RIGHTMOST(opaque))
+ {
+ itemid = PageGetItemId(page, P_HIKEY);
+ rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
+ sizeof(ItemIdData));
+ }
+
+ /* Count up total space in data items without actually scanning 'em */
+ olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
+
state.newitemsz = newitemsz;
state.is_leaf = P_ISLEAF(opaque);
state.is_rightmost = P_RIGHTMOST(opaque);
BTREE_DEFAULT_FILLFACTOR);
else
state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
-
- /* Total free space available on a btree page, after fixed overhead */
- leftspace = rightspace =
- PageGetPageSize(page) - SizeOfPageHeaderData -
- MAXALIGN(sizeof(BTPageOpaqueData));
+ state.newitemonleft = false; /* these just to keep compiler quiet */
+ state.firstright = 0;
+ state.best_delta = 0;
+ state.leftspace = leftspace;
+ state.rightspace = rightspace;
+ state.olddataitemstotal = olddataitemstotal;
+ state.newitemoff = newitemoff;
/*
* Finding the best possible split would require checking all the possible
*/
goodenough = leftspace / 16;
- /* The right page will have the same high key as the old page */
- if (!P_RIGHTMOST(opaque))
- {
- itemid = PageGetItemId(page, P_HIKEY);
- rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
- sizeof(ItemIdData));
- }
-
- /* Count up total space in data items without actually scanning 'em */
- dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
-
/*
* Scan through the data items and calculate space usage for a split at
* each possible position.
*/
- dataitemstoleft = 0;
+ olddataitemstoleft = 0;
+ goodenoughfound = false;
maxoff = PageGetMaxOffsetNumber(page);
for (offnum = P_FIRSTDATAKEY(opaque);
offnum = OffsetNumberNext(offnum))
{
Size itemsz;
- int leftfree,
- rightfree;
itemid = PageGetItemId(page, offnum);
itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
- /*
- * We have to allow for the current item becoming the high key of the
- * left page; therefore it counts against left space as well as right
- * space.
- */
- leftfree = leftspace - dataitemstoleft - (int) itemsz;
- rightfree = rightspace - (dataitemtotal - dataitemstoleft);
-
/*
* Will the new item go to left or right of split?
*/
if (offnum > newitemoff)
- _bt_checksplitloc(&state, offnum, leftfree, rightfree,
- true, itemsz);
+ _bt_checksplitloc(&state, offnum, true,
+ olddataitemstoleft, itemsz);
+
else if (offnum < newitemoff)
- _bt_checksplitloc(&state, offnum, leftfree, rightfree,
- false, itemsz);
+ _bt_checksplitloc(&state, offnum, false,
+ olddataitemstoleft, itemsz);
else
{
/* need to try it both ways! */
- _bt_checksplitloc(&state, offnum, leftfree, rightfree,
- true, itemsz);
- /* here we are contemplating newitem as first on right */
- _bt_checksplitloc(&state, offnum, leftfree, rightfree,
- false, newitemsz);
+ _bt_checksplitloc(&state, offnum, true,
+ olddataitemstoleft, itemsz);
+
+ _bt_checksplitloc(&state, offnum, false,
+ olddataitemstoleft, itemsz);
}
/* Abort scan once we find a good-enough choice */
if (state.have_split && state.best_delta <= goodenough)
+ {
+ goodenoughfound = true;
break;
+ }
- dataitemstoleft += itemsz;
+ olddataitemstoleft += itemsz;
}
+ /*
+ * If the new item goes as the last item, check for splitting so that all
+ * the old items go to the left page and the new item goes to the right
+ * page.
+ */
+ if (newitemoff > maxoff && !goodenoughfound)
+ _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
+
/*
* I believe it is not possible to fail to find a feasible split, but just
* in case ...
*/
if (!state.have_split)
- elog(ERROR, "could not find a feasible split point for \"%s\"",
+ elog(ERROR, "could not find a feasible split point for index \"%s\"",
RelationGetRelationName(rel));
*newitemonleft = state.newitemonleft;
/*
* Subroutine to analyze a particular possible split choice (ie, firstright
* and newitemonleft settings), and record the best split so far in *state.
+ *
+ * firstoldonright is the offset of the first item on the original page
+ * that goes to the right page, and firstoldonrightsz is the size of that
+ * tuple. firstoldonright can be > max offset, which means that all the old
+ * items go to the left page and only the new item goes to the right page.
+ * In that case, firstoldonrightsz is not used.
+ *
+ * olddataitemstoleft is the total size of all old items to the left of
+ * firstoldonright.
*/
static void
-_bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
- int leftfree, int rightfree,
- bool newitemonleft, Size firstrightitemsz)
+_bt_checksplitloc(FindSplitData *state,
+ OffsetNumber firstoldonright,
+ bool newitemonleft,
+ int olddataitemstoleft,
+ Size firstoldonrightsz)
{
+ int leftfree,
+ rightfree;
+ Size firstrightitemsz;
+ bool newitemisfirstonright;
+
+ /* Is the new item going to be the first item on the right page? */
+ newitemisfirstonright = (firstoldonright == state->newitemoff
+ && !newitemonleft);
+
+ if (newitemisfirstonright)
+ firstrightitemsz = state->newitemsz;
+ else
+ firstrightitemsz = firstoldonrightsz;
+
+ /* Account for all the old tuples */
+ leftfree = state->leftspace - olddataitemstoleft;
+ rightfree = state->rightspace -
+ (state->olddataitemstotal - olddataitemstoleft);
+
/*
- * Account for the new item on whichever side it is to be put.
+ * The first item on the right page becomes the high key of the left page;
+ * therefore it counts against left space as well as right space.
*/
+ leftfree -= firstrightitemsz;
+
+ /* account for the new item */
if (newitemonleft)
leftfree -= (int) state->newitemsz;
else
{
state->have_split = true;
state->newitemonleft = newitemonleft;
- state->firstright = firstright;
+ state->firstright = firstoldonright;
state->best_delta = delta;
}
}
*
* On entry, buf and rbuf are the left and right split pages, which we
* still hold write locks on per the L&Y algorithm. We release the
- * write locks once we have write lock on the parent page. (Any sooner,
+ * write locks once we have write lock on the parent page. (Any sooner,
* and it'd be possible for some other process to try to split or delete
* one of these pages, and get confused because it cannot find the downlink.)
*
* have to be efficient (concurrent ROOT split, WAL recovery)
* is_root - we split the true root
* is_only - we split a page alone on its level (might have been fast root)
- *
- * This is exported so it can be called by nbtxlog.c.
*/
-void
+static void
_bt_insert_parent(Relation rel,
Buffer buf,
Buffer rbuf,
* Here we have to do something Lehman and Yao don't talk about: deal with
* a root split and construction of a new root. If our stack is empty
* then we have just split a node on what had been the root level when we
- * descended the tree. If it was still the root then we perform a
+ * descended the tree. If it was still the root then we perform a
* new-root construction. If it *wasn't* the root anymore, search to find
* the next higher level that someone constructed meanwhile, and find the
* right place to insert as for the normal case.
*
* If we have to search for the parent level, we do so by re-descending
* from the root. This is not super-efficient, but it's rare enough not
- * to matter. (This path is also taken when called from WAL recovery ---
- * we have no stack in that case.)
+ * to matter.
*/
if (is_root)
{
{
BTPageOpaque lpageop;
- if (!InRecovery)
- elog(DEBUG2, "concurrent ROOT page split");
+ elog(DEBUG2, "concurrent ROOT page split");
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
/* Find the leftmost page at the next level up */
pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
* 05/27/97
*/
ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
-
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
- /* Now we can unlock the children */
+ /*
+ * Now we can unlock the right child. The left child will be unlocked
+ * by _bt_insertonpg().
+ */
_bt_relbuf(rel, rbuf);
- _bt_relbuf(rel, buf);
/* Check for error only after writing children */
if (pbuf == InvalidBuffer)
- elog(ERROR, "failed to re-find parent key in \"%s\" for split pages %u/%u",
+ elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
RelationGetRelationName(rel), bknum, rbknum);
/* Recursively update the parent */
- _bt_insertonpg(rel, pbuf, stack->bts_parent,
- 0, NULL, new_item, stack->bts_offset,
+ _bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
+ new_item, stack->bts_offset + 1,
is_only);
/* be tidy */
}
}
+/*
+ * _bt_finish_split() -- Finish an incomplete split
+ *
+ * A crash or other failure can leave a split incomplete. The insertion
+ * routines won't allow to insert on a page that is incompletely split.
+ * Before inserting on such a page, call _bt_finish_split().
+ *
+ * On entry, 'lbuf' must be locked in write-mode. On exit, it is unlocked
+ * and unpinned.
+ */
+void
+_bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
+{
+ Page lpage = BufferGetPage(lbuf);
+ BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
+ Buffer rbuf;
+ Page rpage;
+ BTPageOpaque rpageop;
+ bool was_root;
+ bool was_only;
+
+ Assert(P_INCOMPLETE_SPLIT(lpageop));
+
+ /* Lock right sibling, the one missing the downlink */
+ rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
+ rpage = BufferGetPage(rbuf);
+ rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
+
+ /* Could this be a root split? */
+ if (!stack)
+ {
+ Buffer metabuf;
+ Page metapg;
+ BTMetaPageData *metad;
+
+ /* acquire lock on the metapage */
+ metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
+ metapg = BufferGetPage(metabuf);
+ metad = BTPageGetMeta(metapg);
+
+ was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
+
+ _bt_relbuf(rel, metabuf);
+ }
+ else
+ was_root = false;
+
+ /* Was this the only page on the level before split? */
+ was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
+
+ elog(DEBUG1, "finishing incomplete split of %u/%u",
+ BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
+
+ _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
+}
+
/*
* _bt_getstackbuf() -- Walk back up the tree one step, and find the item
* we last looked at in the parent.
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
+ {
+ _bt_finish_split(rel, buf, stack->bts_parent);
+ continue;
+ }
+
if (!P_IGNORE(opaque))
{
OffsetNumber offnum,
/*
* These loops will check every item on the page --- but in an
* order that's attuned to the probability of where it actually
- * is. Scan to the right first, then to the left.
+ * is. Scan to the right first, then to the left.
*/
for (offnum = start;
offnum <= maxoff;
rbkno;
BlockNumber rootblknum;
BTPageOpaque rootopaque;
+ BTPageOpaque lopaque;
ItemId itemid;
IndexTuple item;
- Size itemsz;
- IndexTuple new_item;
+ IndexTuple left_item;
+ Size left_item_sz;
+ IndexTuple right_item;
+ Size right_item_sz;
Buffer metabuf;
Page metapg;
BTMetaPageData *metad;
lbkno = BufferGetBlockNumber(lbuf);
rbkno = BufferGetBlockNumber(rbuf);
lpage = BufferGetPage(lbuf);
+ lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
/* get a new root page */
rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
metapg = BufferGetPage(metabuf);
metad = BTPageGetMeta(metapg);
+ /*
+ * Create downlink item for left page (old root). Since this will be the
+ * first item in a non-leaf page, it implicitly has minus-infinity key
+ * value, so we need not store any actual key in it.
+ */
+ left_item_sz = sizeof(IndexTupleData);
+ left_item = (IndexTuple) palloc(left_item_sz);
+ left_item->t_info = left_item_sz;
+ ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
+
+ /*
+ * Create downlink item for right page. The key for it is obtained from
+ * the "high key" position in the left page.
+ */
+ itemid = PageGetItemId(lpage, P_HIKEY);
+ right_item_sz = ItemIdGetLength(itemid);
+ item = (IndexTuple) PageGetItem(lpage, itemid);
+ right_item = CopyIndexTuple(item);
+ ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
+
/* NO EREPORT(ERROR) from here till newroot op is logged */
START_CRIT_SECTION();
metad->btm_fastroot = rootblknum;
metad->btm_fastlevel = rootopaque->btpo.level;
- /*
- * Create downlink item for left page (old root). Since this will be the
- * first item in a non-leaf page, it implicitly has minus-infinity key
- * value, so we need not store any actual key in it.
- */
- itemsz = sizeof(IndexTupleData);
- new_item = (IndexTuple) palloc(itemsz);
- new_item->t_info = itemsz;
- ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
-
/*
* Insert the left page pointer into the new root page. The root page is
* the rightmost page on its level so there is no "high key" in it; the
* Note: we *must* insert the two items in item-number order, for the
* benefit of _bt_restore_page().
*/
- if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add leftkey to new root page");
- pfree(new_item);
-
- /*
- * Create downlink item for right page. The key for it is obtained from
- * the "high key" position in the left page.
- */
- itemid = PageGetItemId(lpage, P_HIKEY);
- itemsz = ItemIdGetLength(itemid);
- item = (IndexTuple) PageGetItem(lpage, itemid);
- new_item = CopyIndexTuple(item);
- ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
+ if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "failed to add leftkey to new root page"
+ " while splitting block %u of index \"%s\"",
+ BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
/*
* insert the right page pointer into the new root page.
*/
- if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add rightkey to new root page");
- pfree(new_item);
+ if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "failed to add rightkey to new root page"
+ " while splitting block %u of index \"%s\"",
+ BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
+
+ /* Clear the incomplete-split flag in the left child */
+ Assert(P_INCOMPLETE_SPLIT(lopaque));
+ lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ MarkBufferDirty(lbuf);
MarkBufferDirty(rootbuf);
MarkBufferDirty(metabuf);
/* XLOG stuff */
- if (!rel->rd_istemp)
+ if (RelationNeedsWAL(rel))
{
xl_btree_newroot xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
+ xl_btree_metadata md;
- xlrec.node = rel->rd_node;
xlrec.rootblk = rootblknum;
xlrec.level = metad->btm_level;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeNewroot;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
+
+ XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
+ XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
+ XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+
+ md.root = rootblknum;
+ md.level = metad->btm_level;
+ md.fastroot = rootblknum;
+ md.fastlevel = metad->btm_level;
+
+ XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
/*
* Direct access to page is not good but faster - we should implement
* some new func in page API.
*/
- rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
- rdata[1].len = ((PageHeader) rootpage)->pd_special -
- ((PageHeader) rootpage)->pd_upper;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
+ XLogRegisterBufData(0,
+ (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
+ ((PageHeader) rootpage)->pd_special -
+ ((PageHeader) rootpage)->pd_upper);
- recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
+ PageSetLSN(lpage, recptr);
PageSetLSN(rootpage, recptr);
- PageSetTLI(rootpage, ThisTimeLineID);
PageSetLSN(metapg, recptr);
- PageSetTLI(metapg, ThisTimeLineID);
}
END_CRIT_SECTION();
- /* send out relcache inval for metapage change */
- CacheInvalidateRelcache(rel);
-
/* done with metapage */
_bt_relbuf(rel, metabuf);
+ pfree(left_item);
+ pfree(right_item);
+
return rootbuf;
}
* we insert the tuples in order, so that the given itup_off does
* represent the final position of the tuple!
*/
-static void
-_bt_pgaddtup(Relation rel,
- Page page,
+static bool
+_bt_pgaddtup(Page page,
Size itemsize,
IndexTuple itup,
- OffsetNumber itup_off,
- const char *where)
+ OffsetNumber itup_off)
{
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
IndexTupleData trunctuple;
}
if (PageAddItem(page, (Item) itup, itemsize, itup_off,
- LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add item to the %s for \"%s\"",
- where, RelationGetRelationName(rel));
+ false, false) == InvalidOffsetNumber)
+ return false;
+
+ return true;
}
/*
if (isNull || (scankey->sk_flags & SK_ISNULL))
return false;
- result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
- datum,
- scankey->sk_argument));
+ result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
+ scankey->sk_collation,
+ datum,
+ scankey->sk_argument));
if (result != 0)
return false;
/*
* _bt_vacuum_one_page - vacuum just one index page.
*
- * Try to remove LP_DELETE items from the given page. The passed buffer
+ * Try to remove LP_DEAD items from the given page. The passed buffer
* must be exclusive-locked, but unlike a real VACUUM, we don't need a
* super-exclusive "cleanup" lock (see nbtree/README).
*/
static void
-_bt_vacuum_one_page(Relation rel, Buffer buffer)
+_bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
{
OffsetNumber deletable[MaxOffsetNumber];
int ndeletable = 0;
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/*
- * Scan over all items to see which ones need deleted according to
- * LP_DELETE flags.
+ * Scan over all items to see which ones need to be deleted according to
+ * LP_DEAD flags.
*/
minoff = P_FIRSTDATAKEY(opaque);
maxoff = PageGetMaxOffsetNumber(page);
{
ItemId itemId = PageGetItemId(page, offnum);
- if (ItemIdDeleted(itemId))
+ if (ItemIdIsDead(itemId))
deletable[ndeletable++] = offnum;
}
if (ndeletable > 0)
- _bt_delitems(rel, buffer, deletable, ndeletable);
+ _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
/*
- * Note: if we didn't find any LP_DELETE items, then the page's
+ * Note: if we didn't find any LP_DEAD items, then the page's
* BTP_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
* separate write to clear it, however. We will clear it when we split
* the page.