* nbtinsert.c
* Item insertion in Lehman and Yao btrees for Postgres.
*
- * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.118 2004/10/26 16:05:02 tgl Exp $
+ * src/backend/access/nbtree/nbtinsert.c
*
*-------------------------------------------------------------------------
*/
#include "access/heapam.h"
#include "access/nbtree.h"
+#include "access/transam.h"
+#include "access/xloginsert.h"
#include "miscadmin.h"
+#include "storage/lmgr.h"
+#include "storage/predicate.h"
+#include "utils/tqual.h"
typedef struct
{
/* context data for _bt_checksplitloc */
Size newitemsz; /* size of new item to be inserted */
+ int fillfactor; /* needed when splitting rightmost page */
bool is_leaf; /* T if splitting a leaf page */
bool is_rightmost; /* T if splitting a rightmost page */
+ OffsetNumber newitemoff; /* where the new item is to be inserted */
+ int leftspace; /* space available for items on left page */
+ int rightspace; /* space available for items on right page */
+ int olddataitemstotal; /* space taken by old items */
bool have_split; /* found a valid split? */
static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
-static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
- Relation heapRel, Buffer buf,
- ScanKey itup_scankey);
-static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
+static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
+ Relation heapRel, Buffer buf, OffsetNumber offset,
+ ScanKey itup_scankey,
+ IndexUniqueCheck checkUnique, bool *is_unique,
+ uint32 *speculativeToken);
+static void _bt_findinsertloc(Relation rel,
+ Buffer *bufptr,
+ OffsetNumber *offsetptr,
+ int keysz,
+ ScanKey scankey,
+ IndexTuple newtup,
+ BTStack stack,
+ Relation heapRel);
+static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
BTStack stack,
- int keysz, ScanKey scankey,
- BTItem btitem,
- OffsetNumber afteritem,
+ IndexTuple itup,
+ OffsetNumber newitemoff,
bool split_only_page);
-static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
- OffsetNumber newitemoff, Size newitemsz,
- BTItem newitem, bool newitemonleft,
- OffsetNumber *itup_off, BlockNumber *itup_blkno);
+static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
+ OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
+ IndexTuple newitem, bool newitemonleft);
+static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
+ BTStack stack, bool is_root, bool is_only);
static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
OffsetNumber newitemoff,
Size newitemsz,
bool *newitemonleft);
-static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
- int leftfree, int rightfree,
- bool newitemonleft, Size firstrightitemsz);
-static void _bt_pgaddtup(Relation rel, Page page,
- Size itemsize, BTItem btitem,
- OffsetNumber itup_off, const char *where);
+static void _bt_checksplitloc(FindSplitData *state,
+ OffsetNumber firstoldonright, bool newitemonleft,
+ int dataitemstoleft, Size firstoldonrightsz);
+static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
+ OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
+static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
/*
- * _bt_doinsert() -- Handle insertion of a single btitem in the tree.
+ * _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
+ *
+ * This routine is called by the public interface routine, btinsert.
+ * By here, itup is filled in, including the TID.
*
- * This routine is called by the public interface routines, btbuild
- * and btinsert. By here, btitem is filled in, including the TID.
+ * If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
+ * will allow duplicates. Otherwise (UNIQUE_CHECK_YES or
+ * UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
+ * For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
+ * don't actually insert.
+ *
+ * The result value is only significant for UNIQUE_CHECK_PARTIAL:
+ * it must be TRUE if the entry is known unique, else FALSE.
+ * (In the current implementation we'll also return TRUE after a
+ * successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
+ * that's just a coding artifact.)
*/
-InsertIndexResult
-_bt_doinsert(Relation rel, BTItem btitem,
- bool index_is_unique, Relation heapRel)
+bool
+_bt_doinsert(Relation rel, IndexTuple itup,
+ IndexUniqueCheck checkUnique, Relation heapRel)
{
- IndexTuple itup = &(btitem->bti_itup);
+ bool is_unique = false;
int natts = rel->rd_rel->relnatts;
ScanKey itup_scankey;
BTStack stack;
Buffer buf;
- InsertIndexResult res;
+ OffsetNumber offset;
- /* we need a scan key to do our search, so build one */
+ /* we need an insertion scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup);
top:
/* find the first page containing this key */
stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+ offset = InvalidOffsetNumber;
+
/* trade in our read lock for a write lock */
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
LockBuffer(buf, BT_WRITE);
/*
* If the page was split between the time that we surrendered our read
- * lock and acquired our write lock, then this page may no longer be
- * the right place for the key we want to insert. In this case, we
- * need to move right in the tree. See Lehman and Yao for an
- * excruciatingly precise description.
+ * lock and acquired our write lock, then this page may no longer be the
+ * right place for the key we want to insert. In this case, we need to
+ * move right in the tree. See Lehman and Yao for an excruciatingly
+ * precise description.
*/
- buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
+ buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+ true, stack, BT_WRITE);
/*
- * If we're not allowing duplicates, make sure the key isn't already
- * in the index.
+ * If we're not allowing duplicates, make sure the key isn't already in
+ * the index.
*
- * NOTE: obviously, _bt_check_unique can only detect keys that are
- * already in the index; so it cannot defend against concurrent
- * insertions of the same key. We protect against that by means of
- * holding a write lock on the target page. Any other would-be
- * inserter of the same key must acquire a write lock on the same
- * target page, so only one would-be inserter can be making the check
- * at one time. Furthermore, once we are past the check we hold write
- * locks continuously until we have performed our insertion, so no
- * later inserter can fail to see our insertion. (This requires some
- * care in _bt_insertonpg.)
+ * NOTE: obviously, _bt_check_unique can only detect keys that are already
+ * in the index; so it cannot defend against concurrent insertions of the
+ * same key. We protect against that by means of holding a write lock on
+ * the target page. Any other would-be inserter of the same key must
+ * acquire a write lock on the same target page, so only one would-be
+ * inserter can be making the check at one time. Furthermore, once we are
+ * past the check we hold write locks continuously until we have performed
+ * our insertion, so no later inserter can fail to see our insertion.
+ * (This requires some care in _bt_insertonpg.)
*
* If we must wait for another xact, we release the lock while waiting,
* and then must start over completely.
+ *
+ * For a partial uniqueness check, we don't wait for the other xact. Just
+ * let the tuple in and return false for possibly non-unique, or true for
+ * definitely unique.
*/
- if (index_is_unique)
+ if (checkUnique != UNIQUE_CHECK_NO)
{
TransactionId xwait;
+ uint32 speculativeToken;
- xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
+ offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+ xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
+ checkUnique, &is_unique, &speculativeToken);
if (TransactionIdIsValid(xwait))
{
/* Have to wait for the other guy ... */
_bt_relbuf(rel, buf);
- XactLockTableWait(xwait);
+
+ /*
+ * If it's a speculative insertion, wait for it to finish (ie. to
+ * go ahead with the insertion, or kill the tuple). Otherwise
+ * wait for the transaction to finish as usual.
+ */
+ if (speculativeToken)
+ SpeculativeInsertionWait(xwait, speculativeToken);
+ else
+ XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
+
/* start over... */
_bt_freestack(stack);
goto top;
}
}
- /* do the insertion */
- res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem,
- 0, false);
+ if (checkUnique != UNIQUE_CHECK_EXISTING)
+ {
+ /*
+ * The only conflict predicate locking cares about for indexes is when
+ * an index tuple insert conflicts with an existing lock. Since the
+ * actual location of the insert is hard to predict because of the
+ * random search used to prevent O(N^2) performance when there are
+ * many duplicate entries, we can just use the "first valid" page.
+ */
+ CheckForSerializableConflictIn(rel, NULL, buf);
+ /* do the insertion */
+ _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+ stack, heapRel);
+ _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
+ }
+ else
+ {
+ /* just release the buffer */
+ _bt_relbuf(rel, buf);
+ }
/* be tidy */
_bt_freestack(stack);
_bt_freeskey(itup_scankey);
- return res;
+ return is_unique;
}
/*
* _bt_check_unique() -- Check for violation of unique index constraint
*
+ * offset points to the first possible item that could conflict. It can
+ * also point to end-of-page, which means that the first tuple to check
+ * is the first tuple on the next page.
+ *
* Returns InvalidTransactionId if there is no conflict, else an xact ID
- * we must wait for to see if it commits a conflicting tuple. If an actual
- * conflict is detected, no return --- just ereport().
+ * we must wait for to see if it commits a conflicting tuple. If an actual
+ * conflict is detected, no return --- just ereport(). If an xact ID is
+ * returned, and the conflicting tuple still has a speculative insertion in
+ * progress, *speculativeToken is set to non-zero, and the caller can wait for
+ * the verdict on the insertion using SpeculativeInsertionWait().
+ *
+ * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
+ * InvalidTransactionId because we don't want to wait. In this case we
+ * set *is_unique to false if there is a potential conflict, and the
+ * core code must redo the uniqueness check later.
*/
static TransactionId
-_bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
- Buffer buf, ScanKey itup_scankey)
+_bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
+ Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
+ IndexUniqueCheck checkUnique, bool *is_unique,
+ uint32 *speculativeToken)
{
TupleDesc itupdesc = RelationGetDescr(rel);
int natts = rel->rd_rel->relnatts;
- OffsetNumber offset,
- maxoff;
+ SnapshotData SnapshotDirty;
+ OffsetNumber maxoff;
Page page;
BTPageOpaque opaque;
Buffer nbuf = InvalidBuffer;
+ bool found = false;
+
+ /* Assume unique until we find a duplicate */
+ *is_unique = true;
+
+ InitDirtySnapshot(SnapshotDirty);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
- /*
- * Find first item >= proposed new item. Note we could also get a
- * pointer to end-of-page here.
- */
- offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
-
/*
* Scan over all equal tuples, looking for live conflicts.
*/
for (;;)
{
- HeapTupleData htup;
- Buffer hbuffer;
ItemId curitemid;
- BTItem cbti;
+ IndexTuple curitup;
BlockNumber nblkno;
/*
* We can skip items that are marked killed.
*
* Formerly, we applied _bt_isequal() before checking the kill
- * flag, so as to fall out of the item loop as soon as
- * possible. However, in the presence of heavy update activity
- * an index may contain many killed items with the same key;
- * running _bt_isequal() on each killed item gets expensive.
- * Furthermore it is likely that the non-killed version of
- * each key appears first, so that we didn't actually get to
- * exit any sooner anyway. So now we just advance over killed
- * items as quickly as we can. We only apply _bt_isequal()
- * when we get to a non-killed item or the end of the page.
+ * flag, so as to fall out of the item loop as soon as possible.
+ * However, in the presence of heavy update activity an index may
+ * contain many killed items with the same key; running
+ * _bt_isequal() on each killed item gets expensive. Furthermore
+ * it is likely that the non-killed version of each key appears
+ * first, so that we didn't actually get to exit any sooner
+ * anyway. So now we just advance over killed items as quickly as
+ * we can. We only apply _bt_isequal() when we get to a non-killed
+ * item or the end of the page.
*/
- if (!ItemIdDeleted(curitemid))
+ if (!ItemIdIsDead(curitemid))
{
+ ItemPointerData htid;
+ bool all_dead;
+
/*
- * _bt_compare returns 0 for (1,NULL) and (1,NULL) -
- * this's how we handling NULLs - and so we must not use
- * _bt_compare in real comparison, but only for
- * ordering/finding items on pages. - vadim 03/24/97
+ * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
+ * how we handling NULLs - and so we must not use _bt_compare
+ * in real comparison, but only for ordering/finding items on
+ * pages. - vadim 03/24/97
*/
if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
break; /* we're past all the equal tuples */
/* okay, we gotta fetch the heap tuple ... */
- cbti = (BTItem) PageGetItem(page, curitemid);
- htup.t_self = cbti->bti_itup.t_tid;
- if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
- true, NULL))
+ curitup = (IndexTuple) PageGetItem(page, curitemid);
+ htid = curitup->t_tid;
+
+ /*
+ * If we are doing a recheck, we expect to find the tuple we
+ * are rechecking. It's not a duplicate, but we have to keep
+ * scanning.
+ */
+ if (checkUnique == UNIQUE_CHECK_EXISTING &&
+ ItemPointerCompare(&htid, &itup->t_tid) == 0)
{
- /* it is a duplicate */
- TransactionId xwait =
- (TransactionIdIsValid(SnapshotDirty->xmin)) ?
- SnapshotDirty->xmin : SnapshotDirty->xmax;
+ found = true;
+ }
- ReleaseBuffer(hbuffer);
+ /*
+ * We check the whole HOT-chain to see if there is any tuple
+ * that satisfies SnapshotDirty. This is necessary because we
+ * have just a single index entry for the entire chain.
+ */
+ else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
+ &all_dead))
+ {
+ TransactionId xwait;
+
+ /*
+ * It is a duplicate. If we are only doing a partial
+ * check, then don't bother checking if the tuple is being
+ * updated in another transaction. Just return the fact
+ * that it is a potential conflict and leave the full
+ * check till later.
+ */
+ if (checkUnique == UNIQUE_CHECK_PARTIAL)
+ {
+ if (nbuf != InvalidBuffer)
+ _bt_relbuf(rel, nbuf);
+ *is_unique = false;
+ return InvalidTransactionId;
+ }
/*
* If this tuple is being updated by other transaction
* then we have to wait for its commit/abort.
*/
+ xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
+ SnapshotDirty.xmin : SnapshotDirty.xmax;
+
if (TransactionIdIsValid(xwait))
{
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
/* Tell _bt_doinsert to wait... */
+ *speculativeToken = SnapshotDirty.speculativeToken;
return xwait;
}
/*
- * Otherwise we have a definite conflict.
+ * Otherwise we have a definite conflict. But before
+ * complaining, look to see if the tuple we want to insert
+ * is itself now committed dead --- if so, don't complain.
+ * This is a waste of time in normal scenarios but we must
+ * do it to support CREATE INDEX CONCURRENTLY.
+ *
+ * We must follow HOT-chains here because during
+ * concurrent index build, we insert the root TID though
+ * the actual tuple may be somewhere in the HOT-chain.
+ * While following the chain we might not stop at the
+ * exact tuple which triggered the insert, but that's OK
+ * because if we find a live tuple anywhere in this chain,
+ * we have a unique key conflict. The other live tuple is
+ * not part of this chain because it had a different index
+ * entry.
*/
- ereport(ERROR,
- (errcode(ERRCODE_UNIQUE_VIOLATION),
- errmsg("duplicate key violates unique constraint \"%s\"",
- RelationGetRelationName(rel))));
- }
- else if (htup.t_data != NULL)
- {
+ htid = itup->t_tid;
+ if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
+ {
+ /* Normal case --- it's still live */
+ }
+ else
+ {
+ /*
+ * It's been deleted, so no error, and no need to
+ * continue searching
+ */
+ break;
+ }
+
/*
- * Hmm, if we can't see the tuple, maybe it can be
- * marked killed. This logic should match
- * index_getnext and btgettuple.
+ * This is a definite conflict. Break the tuple down into
+ * datums and report the error. But first, make sure we
+ * release the buffer locks we're holding ---
+ * BuildIndexValueDescription could make catalog accesses,
+ * which in the worst case might touch this same index and
+ * cause deadlocks.
*/
- LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
- if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin,
- hbuffer) == HEAPTUPLE_DEAD)
+ if (nbuf != InvalidBuffer)
+ _bt_relbuf(rel, nbuf);
+ _bt_relbuf(rel, buf);
+
{
- curitemid->lp_flags |= LP_DELETE;
- SetBufferCommitInfoNeedsSave(buf);
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ char *key_desc;
+
+ index_deform_tuple(itup, RelationGetDescr(rel),
+ values, isnull);
+
+ key_desc = BuildIndexValueDescription(rel, values,
+ isnull);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNIQUE_VIOLATION),
+ errmsg("duplicate key value violates unique constraint \"%s\"",
+ RelationGetRelationName(rel)),
+ key_desc ? errdetail("Key %s already exists.",
+ key_desc) : 0,
+ errtableconstraint(heapRel,
+ RelationGetRelationName(rel))));
}
- LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
}
- ReleaseBuffer(hbuffer);
+ else if (all_dead)
+ {
+ /*
+ * The conflicting tuple (or whole HOT chain) is dead to
+ * everyone, so we may as well mark the index entry
+ * killed.
+ */
+ ItemIdMarkDead(curitemid);
+ opaque->btpo_flags |= BTP_HAS_GARBAGE;
+
+ /*
+ * Mark buffer with a dirty hint, since state is not
+ * crucial. Be sure to mark the proper buffer dirty.
+ */
+ if (nbuf != InvalidBuffer)
+ MarkBufferDirtyHint(nbuf, true);
+ else
+ MarkBufferDirtyHint(buf, true);
+ }
}
}
if (!P_IGNORE(opaque))
break;
if (P_RIGHTMOST(opaque))
- elog(ERROR, "fell off the end of \"%s\"",
+ elog(ERROR, "fell off the end of index \"%s\"",
RelationGetRelationName(rel));
}
maxoff = PageGetMaxOffsetNumber(page);
}
}
+ /*
+ * If we are doing a recheck then we should have found the tuple we are
+ * checking. Otherwise there's something very wrong --- probably, the
+ * index is on a non-immutable expression.
+ */
+ if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("failed to re-find tuple within index \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("This may be because of a non-immutable index expression."),
+ errtableconstraint(heapRel,
+ RelationGetRelationName(rel))));
+
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
return InvalidTransactionId;
}
+
+/*
+ * _bt_findinsertloc() -- Finds an insert location for a tuple
+ *
+ * If the new key is equal to one or more existing keys, we can
+ * legitimately place it anywhere in the series of equal keys --- in fact,
+ * if the new key is equal to the page's "high key" we can place it on
+ * the next page. If it is equal to the high key, and there's not room
+ * to insert the new tuple on the current page without splitting, then
+ * we can move right hoping to find more free space and avoid a split.
+ * (We should not move right indefinitely, however, since that leads to
+ * O(N^2) insertion behavior in the presence of many equal keys.)
+ * Once we have chosen the page to put the key on, we'll insert it before
+ * any existing equal keys because of the way _bt_binsrch() works.
+ *
+ * If there's not enough room in the space, we try to make room by
+ * removing any LP_DEAD tuples.
+ *
+ * On entry, *bufptr and *offsetptr point to the first legal position
+ * where the new tuple could be inserted. The caller should hold an
+ * exclusive lock on *bufptr. *offsetptr can also be set to
+ * InvalidOffsetNumber, in which case the function will search for the
+ * right location within the page if needed. On exit, they point to the
+ * chosen insert location. If _bt_findinsertloc decides to move right,
+ * the lock and pin on the original page will be released and the new
+ * page returned to the caller is exclusively locked instead.
+ *
+ * newtup is the new tuple we're inserting, and scankey is an insertion
+ * type scan key for it.
+ */
+static void
+_bt_findinsertloc(Relation rel,
+ Buffer *bufptr,
+ OffsetNumber *offsetptr,
+ int keysz,
+ ScanKey scankey,
+ IndexTuple newtup,
+ BTStack stack,
+ Relation heapRel)
+{
+ Buffer buf = *bufptr;
+ Page page = BufferGetPage(buf);
+ Size itemsz;
+ BTPageOpaque lpageop;
+ bool movedright,
+ vacuumed;
+ OffsetNumber newitemoff;
+ OffsetNumber firstlegaloff = *offsetptr;
+
+ lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ itemsz = IndexTupleDSize(*newtup);
+ itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
+ * need to be consistent */
+
+ /*
+ * Check whether the item can fit on a btree page at all. (Eventually, we
+ * ought to try to apply TOAST methods if not.) We actually need to be
+ * able to fit three items on every page, so restrict any one item to 1/3
+ * the per-page available space. Note that at this point, itemsz doesn't
+ * include the ItemId.
+ *
+ * NOTE: if you change this, see also the similar code in _bt_buildadd().
+ */
+ if (itemsz > BTMaxItemSize(page))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
+ itemsz, BTMaxItemSize(page),
+ RelationGetRelationName(rel)),
+ errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
+ "Consider a function index of an MD5 hash of the value, "
+ "or use full text indexing."),
+ errtableconstraint(heapRel,
+ RelationGetRelationName(rel))));
+
+ /*----------
+ * If we will need to split the page to put the item on this page,
+ * check whether we can put the tuple somewhere to the right,
+ * instead. Keep scanning right until we
+ * (a) find a page with enough free space,
+ * (b) reach the last page where the tuple can legally go, or
+ * (c) get tired of searching.
+ * (c) is not flippant; it is important because if there are many
+ * pages' worth of equal keys, it's better to split one of the early
+ * pages than to scan all the way to the end of the run of equal keys
+ * on every insert. We implement "get tired" as a random choice,
+ * since stopping after scanning a fixed number of pages wouldn't work
+ * well (we'd never reach the right-hand side of previously split
+ * pages). Currently the probability of moving right is set at 0.99,
+ * which may seem too high to change the behavior much, but it does an
+ * excellent job of preventing O(N^2) behavior with many equal keys.
+ *----------
+ */
+ movedright = false;
+ vacuumed = false;
+ while (PageGetFreeSpace(page) < itemsz)
+ {
+ Buffer rbuf;
+ BlockNumber rblkno;
+
+ /*
+ * before considering moving right, see if we can obtain enough space
+ * by erasing LP_DEAD items
+ */
+ if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
+ {
+ _bt_vacuum_one_page(rel, buf, heapRel);
+
+ /*
+ * remember that we vacuumed this page, because that makes the
+ * hint supplied by the caller invalid
+ */
+ vacuumed = true;
+
+ if (PageGetFreeSpace(page) >= itemsz)
+ break; /* OK, now we have enough space */
+ }
+
+ /*
+ * nope, so check conditions (b) and (c) enumerated above
+ */
+ if (P_RIGHTMOST(lpageop) ||
+ _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
+ random() <= (MAX_RANDOM_VALUE / 100))
+ break;
+
+ /*
+ * step right to next non-dead page
+ *
+ * must write-lock that page before releasing write lock on current
+ * page; else someone else's _bt_check_unique scan could fail to see
+ * our insertion. write locks on intermediate dead pages won't do
+ * because we don't know when they will get de-linked from the tree.
+ */
+ rbuf = InvalidBuffer;
+
+ rblkno = lpageop->btpo_next;
+ for (;;)
+ {
+ rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
+ page = BufferGetPage(rbuf);
+ lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ /*
+ * If this page was incompletely split, finish the split now. We
+ * do this while holding a lock on the left sibling, which is not
+ * good because finishing the split could be a fairly lengthy
+ * operation. But this should happen very seldom.
+ */
+ if (P_INCOMPLETE_SPLIT(lpageop))
+ {
+ _bt_finish_split(rel, rbuf, stack);
+ rbuf = InvalidBuffer;
+ continue;
+ }
+
+ if (!P_IGNORE(lpageop))
+ break;
+ if (P_RIGHTMOST(lpageop))
+ elog(ERROR, "fell off the end of index \"%s\"",
+ RelationGetRelationName(rel));
+
+ rblkno = lpageop->btpo_next;
+ }
+ _bt_relbuf(rel, buf);
+ buf = rbuf;
+ movedright = true;
+ vacuumed = false;
+ }
+
+ /*
+ * Now we are on the right page, so find the insert position. If we moved
+ * right at all, we know we should insert at the start of the page. If we
+ * didn't move right, we can use the firstlegaloff hint if the caller
+ * supplied one, unless we vacuumed the page which might have moved tuples
+ * around making the hint invalid. If we didn't move right or can't use
+ * the hint, find the position by searching.
+ */
+ if (movedright)
+ newitemoff = P_FIRSTDATAKEY(lpageop);
+ else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
+ newitemoff = firstlegaloff;
+ else
+ newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
+
+ *bufptr = buf;
+ *offsetptr = newitemoff;
+}
+
/*----------
* _bt_insertonpg() -- Insert a tuple on a particular page in the index.
*
* This recursive procedure does the following things:
*
- * + finds the right place to insert the tuple.
* + if necessary, splits the target page (making sure that the
* split is equitable as far as post-insert free space goes).
* + inserts the tuple.
* child page on the parent.
* + updates the metapage if a true root or fast root is split.
*
- * On entry, we must have the right buffer on which to do the
- * insertion, and the buffer must be pinned and locked. On return,
- * we will have dropped both the pin and the write lock on the buffer.
+ * On entry, we must have the correct buffer in which to do the
+ * insertion, and the buffer must be pinned and write-locked. On return,
+ * we will have dropped both the pin and the lock on the buffer.
*
- * If 'afteritem' is >0 then the new tuple must be inserted after the
- * existing item of that number, noplace else. If 'afteritem' is 0
- * then the procedure finds the exact spot to insert it by searching.
- * (keysz and scankey parameters are used ONLY if afteritem == 0.)
- *
- * NOTE: if the new key is equal to one or more existing keys, we can
- * legitimately place it anywhere in the series of equal keys --- in fact,
- * if the new key is equal to the page's "high key" we can place it on
- * the next page. If it is equal to the high key, and there's not room
- * to insert the new tuple on the current page without splitting, then
- * we can move right hoping to find more free space and avoid a split.
- * (We should not move right indefinitely, however, since that leads to
- * O(N^2) insertion behavior in the presence of many equal keys.)
- * Once we have chosen the page to put the key on, we'll insert it before
- * any existing equal keys because of the way _bt_binsrch() works.
+ * When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
+ * page we're inserting the downlink for. This function will clear the
+ * INCOMPLETE_SPLIT flag on it, and release the buffer.
*
* The locking interactions in this code are critical. You should
* grok Lehman and Yao's paper before making any changes. In addition,
* insertion on internal pages.
*----------
*/
-static InsertIndexResult
+static void
_bt_insertonpg(Relation rel,
Buffer buf,
+ Buffer cbuf,
BTStack stack,
- int keysz,
- ScanKey scankey,
- BTItem btitem,
- OffsetNumber afteritem,
+ IndexTuple itup,
+ OffsetNumber newitemoff,
bool split_only_page)
{
- InsertIndexResult res;
Page page;
BTPageOpaque lpageop;
- OffsetNumber itup_off;
- BlockNumber itup_blkno;
- OffsetNumber newitemoff;
OffsetNumber firstright = InvalidOffsetNumber;
Size itemsz;
page = BufferGetPage(buf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
- itemsz = IndexTupleDSize(btitem->bti_itup)
- + (sizeof(BTItemData) - sizeof(IndexTupleData));
+ /* child buffer must be given iff inserting on an internal page */
+ Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
- itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but
- * we need to be consistent */
+ /* The caller should've finished any incomplete splits already. */
+ if (P_INCOMPLETE_SPLIT(lpageop))
+ elog(ERROR, "cannot insert to incompletely split page %u",
+ BufferGetBlockNumber(buf));
- /*
- * Check whether the item can fit on a btree page at all. (Eventually,
- * we ought to try to apply TOAST methods if not.) We actually need to
- * be able to fit three items on every page, so restrict any one item
- * to 1/3 the per-page available space. Note that at this point,
- * itemsz doesn't include the ItemId.
- */
- if (itemsz > BTMaxItemSize(page))
- ereport(ERROR,
- (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("index row size %lu exceeds btree maximum, %lu",
- (unsigned long) itemsz,
- (unsigned long) BTMaxItemSize(page))));
-
- /*
- * Determine exactly where new item will go.
- */
- if (afteritem > 0)
- newitemoff = afteritem + 1;
- else
- {
- /*----------
- * If we will need to split the page to put the item here,
- * check whether we can put the tuple somewhere to the right,
- * instead. Keep scanning right until we
- * (a) find a page with enough free space,
- * (b) reach the last page where the tuple can legally go, or
- * (c) get tired of searching.
- * (c) is not flippant; it is important because if there are many
- * pages' worth of equal keys, it's better to split one of the early
- * pages than to scan all the way to the end of the run of equal keys
- * on every insert. We implement "get tired" as a random choice,
- * since stopping after scanning a fixed number of pages wouldn't work
- * well (we'd never reach the right-hand side of previously split
- * pages). Currently the probability of moving right is set at 0.99,
- * which may seem too high to change the behavior much, but it does an
- * excellent job of preventing O(N^2) behavior with many equal keys.
- *----------
- */
- bool movedright = false;
-
- while (PageGetFreeSpace(page) < itemsz &&
- !P_RIGHTMOST(lpageop) &&
- _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
- random() > (MAX_RANDOM_VALUE / 100))
- {
- /*
- * step right to next non-dead page
- *
- * must write-lock that page before releasing write lock on
- * current page; else someone else's _bt_check_unique scan
- * could fail to see our insertion. write locks on
- * intermediate dead pages won't do because we don't know when
- * they will get de-linked from the tree.
- */
- Buffer rbuf = InvalidBuffer;
-
- for (;;)
- {
- BlockNumber rblkno = lpageop->btpo_next;
-
- rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
- page = BufferGetPage(rbuf);
- lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
- if (!P_IGNORE(lpageop))
- break;
- if (P_RIGHTMOST(lpageop))
- elog(ERROR, "fell off the end of \"%s\"",
- RelationGetRelationName(rel));
- }
- _bt_relbuf(rel, buf);
- buf = rbuf;
- movedright = true;
- }
-
- /*
- * Now we are on the right page, so find the insert position. If
- * we moved right at all, we know we should insert at the start of
- * the page, else must find the position by searching.
- */
- if (movedright)
- newitemoff = P_FIRSTDATAKEY(lpageop);
- else
- newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
- }
+ itemsz = IndexTupleDSize(*itup);
+ itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
+ * need to be consistent */
/*
* Do we need to split the page to fit the item on it?
*
* Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
- * so this comparison is correct even though we appear to be
- * accounting only for the item and not for its line pointer.
+ * so this comparison is correct even though we appear to be accounting
+ * only for the item and not for its line pointer.
*/
if (PageGetFreeSpace(page) < itemsz)
{
&newitemonleft);
/* split the buffer into left and right halves */
- rbuf = _bt_split(rel, buf, firstright,
- newitemoff, itemsz, btitem, newitemonleft,
- &itup_off, &itup_blkno);
+ rbuf = _bt_split(rel, buf, cbuf, firstright,
+ newitemoff, itemsz, itup, newitemonleft);
+ PredicateLockPageSplit(rel,
+ BufferGetBlockNumber(buf),
+ BufferGetBlockNumber(rbuf));
/*----------
* By here,
Buffer metabuf = InvalidBuffer;
Page metapg = NULL;
BTMetaPageData *metad = NULL;
+ OffsetNumber itup_off;
+ BlockNumber itup_blkno;
itup_off = newitemoff;
itup_blkno = BufferGetBlockNumber(buf);
/*
- * If we are doing this insert because we split a page that was
- * the only one on its tree level, but was not the root, it may
- * have been the "fast root". We need to ensure that the fast
- * root link points at or above the current page. We can safely
- * acquire a lock on the metapage here --- see comments for
- * _bt_newroot().
+ * If we are doing this insert because we split a page that was the
+ * only one on its tree level, but was not the root, it may have been
+ * the "fast root". We need to ensure that the fast root link points
+ * at or above the current page. We can safely acquire a lock on the
+ * metapage here --- see comments for _bt_newroot().
*/
if (split_only_page)
{
+ Assert(!P_ISLEAF(lpageop));
+
metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
metapg = BufferGetPage(metabuf);
metad = BTPageGetMeta(metapg);
/* Do the update. No ereport(ERROR) until changes are logged */
START_CRIT_SECTION();
- _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
+ if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
+ elog(PANIC, "failed to add new item to block %u in index \"%s\"",
+ itup_blkno, RelationGetRelationName(rel));
+
+ MarkBufferDirty(buf);
if (BufferIsValid(metabuf))
{
metad->btm_fastroot = itup_blkno;
metad->btm_fastlevel = lpageop->btpo.level;
+ MarkBufferDirty(metabuf);
+ }
+
+ /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
+ if (BufferIsValid(cbuf))
+ {
+ Page cpage = BufferGetPage(cbuf);
+ BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+
+ Assert(P_INCOMPLETE_SPLIT(cpageop));
+ cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ MarkBufferDirty(cbuf);
}
/* XLOG stuff */
- if (!rel->rd_istemp)
+ if (RelationNeedsWAL(rel))
{
xl_btree_insert xlrec;
xl_btree_metadata xlmeta;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[3];
- XLogRecData *nextrdata;
- BTItemData truncitem;
+ IndexTupleData trunctuple;
- xlrec.target.node = rel->rd_node;
- ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
+ xlrec.offnum = itup_off;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeInsert;
- rdata[0].next = nextrdata = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
+
+ if (P_ISLEAF(lpageop))
+ xlinfo = XLOG_BTREE_INSERT_LEAF;
+ else
+ {
+ /*
+ * Register the left child whose INCOMPLETE_SPLIT flag was
+ * cleared.
+ */
+ XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
+
+ xlinfo = XLOG_BTREE_INSERT_UPPER;
+ }
if (BufferIsValid(metabuf))
{
xlmeta.fastroot = metad->btm_fastroot;
xlmeta.fastlevel = metad->btm_fastlevel;
- nextrdata->buffer = InvalidBuffer;
- nextrdata->data = (char *) &xlmeta;
- nextrdata->len = sizeof(xl_btree_metadata);
- nextrdata->next = nextrdata + 1;
- nextrdata++;
+ XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+ XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
+
xlinfo = XLOG_BTREE_INSERT_META;
}
- else if (P_ISLEAF(lpageop))
- xlinfo = XLOG_BTREE_INSERT_LEAF;
- else
- xlinfo = XLOG_BTREE_INSERT_UPPER;
/* Read comments in _bt_pgaddtup */
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
{
- truncitem = *btitem;
- truncitem.bti_itup.t_info = sizeof(BTItemData);
- nextrdata->data = (char *) &truncitem;
- nextrdata->len = sizeof(BTItemData);
+ trunctuple = *itup;
+ trunctuple.t_info = sizeof(IndexTupleData);
+ XLogRegisterBufData(0, (char *) &trunctuple,
+ sizeof(IndexTupleData));
}
else
- {
- nextrdata->data = (char *) btitem;
- nextrdata->len = IndexTupleDSize(btitem->bti_itup) +
- (sizeof(BTItemData) - sizeof(IndexTupleData));
- }
- nextrdata->buffer = buf;
- nextrdata->next = NULL;
+ XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
- recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo);
if (BufferIsValid(metabuf))
{
PageSetLSN(metapg, recptr);
- PageSetTLI(metapg, ThisTimeLineID);
+ }
+ if (BufferIsValid(cbuf))
+ {
+ PageSetLSN(BufferGetPage(cbuf), recptr);
}
PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
}
END_CRIT_SECTION();
- /* Write out the updated page and release pin/lock */
+ /* release buffers */
if (BufferIsValid(metabuf))
- _bt_wrtbuf(rel, metabuf);
-
- _bt_wrtbuf(rel, buf);
+ _bt_relbuf(rel, metabuf);
+ if (BufferIsValid(cbuf))
+ _bt_relbuf(rel, cbuf);
+ _bt_relbuf(rel, buf);
}
-
- /* by here, the new tuple is inserted at itup_blkno/itup_off */
- res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
- ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
-
- return res;
}
/*
* _bt_split() -- split a page in the btree.
*
- * On entry, buf is the page to split, and is write-locked and pinned.
+ * On entry, buf is the page to split, and is pinned and write-locked.
* firstright is the item index of the first item to be moved to the
* new right page. newitemoff etc. tell us about the new item that
* must be inserted along with the data from the old page.
*
+ * When splitting a non-leaf page, 'cbuf' is the left-sibling of the
+ * page we're inserting the downlink for. This function will clear the
+ * INCOMPLETE_SPLIT flag on it, and release the buffer.
+ *
* Returns the new right sibling of buf, pinned and write-locked.
- * The pin and lock on buf are maintained. *itup_off and *itup_blkno
- * are set to the exact location where newitem was inserted.
+ * The pin and lock on buf are maintained.
*/
static Buffer
-_bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
- OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
- bool newitemonleft,
- OffsetNumber *itup_off, BlockNumber *itup_blkno)
+_bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
+ OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
+ bool newitemonleft)
{
Buffer rbuf;
Page origpage;
Page leftpage,
rightpage;
+ BlockNumber origpagenumber,
+ rightpagenumber;
BTPageOpaque ropaque,
lopaque,
oopaque;
BTPageOpaque sopaque = NULL;
Size itemsz;
ItemId itemid;
- BTItem item;
+ IndexTuple item;
OffsetNumber leftoff,
rightoff;
OffsetNumber maxoff;
OffsetNumber i;
+ bool isroot;
+ bool isleaf;
+ /* Acquire a new page to split into */
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
+
+ /*
+ * origpage is the original page to be split. leftpage is a temporary
+ * buffer that receives the left-sibling data, which will be copied back
+ * into origpage on success. rightpage is the new page that receives the
+ * right-sibling data. If we fail before reaching the critical section,
+ * origpage hasn't been modified and leftpage is only workspace. In
+ * principle we shouldn't need to worry about rightpage either, because it
+ * hasn't been linked into the btree page structure; but to avoid leaving
+ * possibly-confusing junk behind, we are careful to rewrite rightpage as
+ * zeroes before throwing any error.
+ */
origpage = BufferGetPage(buf);
- leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
+ leftpage = PageGetTempPage(origpage);
rightpage = BufferGetPage(rbuf);
+ origpagenumber = BufferGetBlockNumber(buf);
+ rightpagenumber = BufferGetBlockNumber(rbuf);
+
_bt_pageinit(leftpage, BufferGetPageSize(buf));
- _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
+ /* rightpage was already initialized by _bt_getbuf */
+
+ /*
+ * Copy the original page's LSN into leftpage, which will become the
+ * updated version of the page. We need this because XLogInsert will
+ * examine the LSN and possibly dump it in a page image.
+ */
+ PageSetLSN(leftpage, PageGetLSN(origpage));
/* init btree private data */
oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
+ isroot = P_ISROOT(oopaque);
+ isleaf = P_ISLEAF(oopaque);
+
/* if we're splitting this page, it won't be the root when we're done */
+ /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
lopaque->btpo_flags = oopaque->btpo_flags;
- lopaque->btpo_flags &= ~BTP_ROOT;
+ lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
ropaque->btpo_flags = lopaque->btpo_flags;
+ /* set flag in left page indicating that the right page has no downlink */
+ lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
lopaque->btpo_prev = oopaque->btpo_prev;
- lopaque->btpo_next = BufferGetBlockNumber(rbuf);
- ropaque->btpo_prev = BufferGetBlockNumber(buf);
+ lopaque->btpo_next = rightpagenumber;
+ ropaque->btpo_prev = origpagenumber;
ropaque->btpo_next = oopaque->btpo_next;
lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
+ /* Since we already have write-lock on both pages, ok to read cycleid */
+ lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
+ ropaque->btpo_cycleid = lopaque->btpo_cycleid;
/*
- * If the page we're splitting is not the rightmost page at its level
- * in the tree, then the first entry on the page is the high key for
- * the page. We need to copy that to the right half. Otherwise
- * (meaning the rightmost page case), all the items on the right half
- * will be user data.
+ * If the page we're splitting is not the rightmost page at its level in
+ * the tree, then the first entry on the page is the high key for the
+ * page. We need to copy that to the right half. Otherwise (meaning the
+ * rightmost page case), all the items on the right half will be user
+ * data.
*/
rightoff = P_HIKEY;
{
itemid = PageGetItemId(origpage, P_HIKEY);
itemsz = ItemIdGetLength(itemid);
- item = (BTItem) PageGetItem(origpage, itemid);
+ item = (IndexTuple) PageGetItem(origpage, itemid);
if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
- LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add hikey to the right sibling");
+ false, false) == InvalidOffsetNumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
/*
- * The "high key" for the new left page will be the first key that's
- * going to go into the new right page. This might be either the
- * existing data item at position firstright, or the incoming tuple.
+ * The "high key" for the new left page will be the first key that's going
+ * to go into the new right page. This might be either the existing data
+ * item at position firstright, or the incoming tuple.
*/
leftoff = P_HIKEY;
if (!newitemonleft && newitemoff == firstright)
/* existing item at firstright will become first on right page */
itemid = PageGetItemId(origpage, firstright);
itemsz = ItemIdGetLength(itemid);
- item = (BTItem) PageGetItem(origpage, itemid);
+ item = (IndexTuple) PageGetItem(origpage, itemid);
}
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
- LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add hikey to the left sibling");
+ false, false) == InvalidOffsetNumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
/*
- * Now transfer all the data items to the appropriate page
+ * Now transfer all the data items to the appropriate page.
+ *
+ * Note: we *must* insert at least the right page's items in item-number
+ * order, for the benefit of _bt_restore_page().
*/
maxoff = PageGetMaxOffsetNumber(origpage);
{
itemid = PageGetItemId(origpage, i);
itemsz = ItemIdGetLength(itemid);
- item = (BTItem) PageGetItem(origpage, itemid);
+ item = (IndexTuple) PageGetItem(origpage, itemid);
/* does new item belong before this one? */
if (i == newitemoff)
{
if (newitemonleft)
{
- _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
- "left sibling");
- *itup_off = leftoff;
- *itup_blkno = BufferGetBlockNumber(buf);
+ if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
- "right sibling");
- *itup_off = rightoff;
- *itup_blkno = BufferGetBlockNumber(rbuf);
+ if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
/* decide which page to put it on */
if (i < firstright)
{
- _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
- "left sibling");
+ if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add old item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add old item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
/* cope with possibility that newitem goes at the end */
if (i <= newitemoff)
{
- if (newitemonleft)
- {
- _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
- "left sibling");
- *itup_off = leftoff;
- *itup_blkno = BufferGetBlockNumber(buf);
- leftoff = OffsetNumberNext(leftoff);
- }
- else
+ /*
+ * Can't have newitemonleft here; that would imply we were told to put
+ * *everything* on the left page, which cannot fit (if it could, we'd
+ * not be splitting the page).
+ */
+ Assert(!newitemonleft);
+ if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
{
- _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
- "right sibling");
- *itup_off = rightoff;
- *itup_blkno = BufferGetBlockNumber(rbuf);
- rightoff = OffsetNumberNext(rightoff);
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
}
+ rightoff = OffsetNumberNext(rightoff);
}
/*
* We have to grab the right sibling (if any) and fix the prev pointer
* there. We are guaranteed that this is deadlock-free since no other
- * writer will be holding a lock on that page and trying to move left,
- * and all readers release locks on a page before trying to fetch its
+ * writer will be holding a lock on that page and trying to move left, and
+ * all readers release locks on a page before trying to fetch its
* neighbors.
*/
- if (!P_RIGHTMOST(ropaque))
+ if (!P_RIGHTMOST(oopaque))
{
- sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
+ sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
spage = BufferGetPage(sbuf);
sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
- if (sopaque->btpo_prev != ropaque->btpo_prev)
- elog(PANIC, "right sibling's left-link doesn't match");
+ if (sopaque->btpo_prev != origpagenumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "right sibling's left-link doesn't match: "
+ "block %u links to %u instead of expected %u in index \"%s\"",
+ oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
+ RelationGetRelationName(rel));
+ }
+
+ /*
+ * Check to see if we can set the SPLIT_END flag in the right-hand
+ * split page; this can save some I/O for vacuum since it need not
+ * proceed to the right sibling. We can set the flag if the right
+ * sibling has a different cycleid: that means it could not be part of
+ * a group of pages that were all split off from the same ancestor
+ * page. If you're confused, imagine that page A splits to A B and
+ * then again, yielding A C B, while vacuum is in progress. Tuples
+ * originally in A could now be in either B or C, hence vacuum must
+ * examine both pages. But if D, our right sibling, has a different
+ * cycleid then it could not contain any tuples that were in A when
+ * the vacuum started.
+ */
+ if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
+ ropaque->btpo_flags |= BTP_SPLIT_END;
}
/*
- * Right sibling is locked, new siblings are prepared, but original
- * page is not updated yet. Log changes before continuing.
+ * Right sibling is locked, new siblings are prepared, but original page
+ * is not updated yet.
*
- * NO EREPORT(ERROR) till right sibling is updated.
+ * NO EREPORT(ERROR) till right sibling is updated. We can get away with
+ * not starting the critical section till here because we haven't been
+ * scribbling on the original page yet; see comments above.
*/
START_CRIT_SECTION();
+ /*
+ * By here, the original data page has been split into two new halves, and
+ * these are correct. The algorithm requires that the left page never
+ * move during a split, so we copy the new left page back on top of the
+ * original. Note that this is not a waste of time, since we also require
+ * (in the page management code) that the center of a page always be
+ * clean, and the most efficient way to guarantee this is just to compact
+ * the data by reinserting it into a new left page. (XXX the latter
+ * comment is probably obsolete; but in any case it's good to not scribble
+ * on the original page until we enter the critical section.)
+ *
+ * We need to do this before writing the WAL record, so that XLogInsert
+ * can WAL log an image of the page if necessary.
+ */
+ PageRestoreTempPage(leftpage, origpage);
+ /* leftpage, lopaque must not be used below here */
+
+ MarkBufferDirty(buf);
+ MarkBufferDirty(rbuf);
+
if (!P_RIGHTMOST(ropaque))
- sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
+ {
+ sopaque->btpo_prev = rightpagenumber;
+ MarkBufferDirty(sbuf);
+ }
+
+ /*
+ * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
+ * a split.
+ */
+ if (!isleaf)
+ {
+ Page cpage = BufferGetPage(cbuf);
+ BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+
+ cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ MarkBufferDirty(cbuf);
+ }
/* XLOG stuff */
- if (!rel->rd_istemp)
+ if (RelationNeedsWAL(rel))
{
xl_btree_split xlrec;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[4];
-
- xlrec.target.node = rel->rd_node;
- ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off);
- if (newitemonleft)
- xlrec.otherblk = BufferGetBlockNumber(rbuf);
- else
- xlrec.otherblk = BufferGetBlockNumber(buf);
- xlrec.leftblk = lopaque->btpo_prev;
- xlrec.rightblk = ropaque->btpo_next;
- xlrec.level = lopaque->btpo.level;
- /*
- * Direct access to page is not good but faster - we should
- * implement some new func in page API. Note we only store the
- * tuples themselves, knowing that the item pointers are in the
- * same order and can be reconstructed by scanning the tuples.
- */
- xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
- ((PageHeader) leftpage)->pd_upper;
+ xlrec.level = ropaque->btpo.level;
+ xlrec.firstright = firstright;
+ xlrec.newitemoff = newitemoff;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeSplit;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
- rdata[1].len = xlrec.leftlen;
- rdata[1].next = &(rdata[2]);
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
+ /* Log the right sibling, because we've changed its prev-pointer. */
+ if (!P_RIGHTMOST(ropaque))
+ XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
+ if (BufferIsValid(cbuf))
+ XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
- rdata[2].buffer = InvalidBuffer;
- rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
- rdata[2].len = ((PageHeader) rightpage)->pd_special -
- ((PageHeader) rightpage)->pd_upper;
- rdata[2].next = NULL;
+ /*
+ * Log the new item, if it was inserted on the left page. (If it was
+ * put on the right page, we don't need to explicitly WAL log it
+ * because it's included with all the other items on the right page.)
+ * Show the new item as belonging to the left page buffer, so that it
+ * is not stored if XLogInsert decides it needs a full-page image of
+ * the left page. We store the offset anyway, though, to support
+ * archive compression of these records.
+ */
+ if (newitemonleft)
+ XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
- if (!P_RIGHTMOST(ropaque))
+ /* Log left page */
+ if (!isleaf)
{
- rdata[2].next = &(rdata[3]);
- rdata[3].buffer = sbuf;
- rdata[3].data = NULL;
- rdata[3].len = 0;
- rdata[3].next = NULL;
+ /*
+ * We must also log the left page's high key, because the right
+ * page's leftmost key is suppressed on non-leaf levels. Show it
+ * as belonging to the left page buffer, so that it is not stored
+ * if XLogInsert decides it needs a full-page image of the left
+ * page.
+ */
+ itemid = PageGetItemId(origpage, P_HIKEY);
+ item = (IndexTuple) PageGetItem(origpage, itemid);
+ XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
}
- if (P_ISROOT(oopaque))
+ /*
+ * Log the contents of the right page in the format understood by
+ * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
+ * because we're going to recreate the whole page anyway, so it should
+ * never be stored by XLogInsert.
+ *
+ * Direct access to page is not good but faster - we should implement
+ * some new func in page API. Note we only store the tuples
+ * themselves, knowing that they were inserted in item-number order
+ * and so the item pointers can be reconstructed. See comments for
+ * _bt_restore_page().
+ */
+ XLogRegisterBufData(1,
+ (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
+ ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
+
+ if (isroot)
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
else
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
- recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo);
- PageSetLSN(leftpage, recptr);
- PageSetTLI(leftpage, ThisTimeLineID);
+ PageSetLSN(origpage, recptr);
PageSetLSN(rightpage, recptr);
- PageSetTLI(rightpage, ThisTimeLineID);
if (!P_RIGHTMOST(ropaque))
{
PageSetLSN(spage, recptr);
- PageSetTLI(spage, ThisTimeLineID);
+ }
+ if (!isleaf)
+ {
+ PageSetLSN(BufferGetPage(cbuf), recptr);
}
}
- /*
- * By here, the original data page has been split into two new halves,
- * and these are correct. The algorithm requires that the left page
- * never move during a split, so we copy the new left page back on top
- * of the original. Note that this is not a waste of time, since we
- * also require (in the page management code) that the center of a
- * page always be clean, and the most efficient way to guarantee this
- * is just to compact the data by reinserting it into a new left page.
- */
-
- PageRestoreTempPage(leftpage, origpage);
-
END_CRIT_SECTION();
- /* write and release the old right sibling */
+ /* release the old right sibling */
if (!P_RIGHTMOST(ropaque))
- _bt_wrtbuf(rel, sbuf);
+ _bt_relbuf(rel, sbuf);
+
+ /* release the child */
+ if (!isleaf)
+ _bt_relbuf(rel, cbuf);
/* split's done */
return rbuf;
* it needs to go into!)
*
* If the page is the rightmost page on its level, we instead try to arrange
- * for twice as much free space on the right as on the left. In this way,
- * when we are inserting successively increasing keys (consider sequences,
- * timestamps, etc) we will end up with a tree whose pages are about 67% full,
+ * to leave the left split page fillfactor% full. In this way, when we are
+ * inserting successively increasing keys (consider sequences, timestamps,
+ * etc) we will end up with a tree whose pages are about fillfactor% full,
* instead of the 50% full result that we'd get without this special case.
- * (We could bias it even further to make the initially-loaded tree more full.
- * But since the steady-state load for a btree is about 70%, we'd likely just
- * be making more page-splitting work for ourselves later on, when we start
- * seeing updates to existing tuples.)
+ * This is the same as nbtsort.c produces for a newly-created tree. Note
+ * that leaf and nonleaf pages use different fillfactors.
*
* We are passed the intended insert position of the new tuple, expressed as
* the offsetnumber of the tuple it must go in front of. (This could be
*
* We return the index of the first existing tuple that should go on the
* righthand page, plus a boolean indicating whether the new tuple goes on
- * the left or right page. The bool is necessary to disambiguate the case
+ * the left or right page. The bool is necessary to disambiguate the case
* where firstright == newitemoff.
*/
static OffsetNumber
int leftspace,
rightspace,
goodenough,
- dataitemtotal,
- dataitemstoleft;
+ olddataitemstotal,
+ olddataitemstoleft;
+ bool goodenoughfound;
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
newitemsz += sizeof(ItemIdData);
- state.newitemsz = newitemsz;
- state.is_leaf = P_ISLEAF(opaque);
- state.is_rightmost = P_RIGHTMOST(opaque);
- state.have_split = false;
/* Total free space available on a btree page, after fixed overhead */
leftspace = rightspace =
PageGetPageSize(page) - SizeOfPageHeaderData -
MAXALIGN(sizeof(BTPageOpaqueData));
- /*
- * Finding the best possible split would require checking all the
- * possible split points, because of the high-key and left-key special
- * cases. That's probably more work than it's worth; instead, stop as
- * soon as we find a "good-enough" split, where good-enough is defined
- * as an imbalance in free space of no more than pagesize/16
- * (arbitrary...) This should let us stop near the middle on most
- * pages, instead of plowing to the end.
- */
- goodenough = leftspace / 16;
-
/* The right page will have the same high key as the old page */
if (!P_RIGHTMOST(opaque))
{
}
/* Count up total space in data items without actually scanning 'em */
- dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
+ olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
+
+ state.newitemsz = newitemsz;
+ state.is_leaf = P_ISLEAF(opaque);
+ state.is_rightmost = P_RIGHTMOST(opaque);
+ state.have_split = false;
+ if (state.is_leaf)
+ state.fillfactor = RelationGetFillFactor(rel,
+ BTREE_DEFAULT_FILLFACTOR);
+ else
+ state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
+ state.newitemonleft = false; /* these just to keep compiler quiet */
+ state.firstright = 0;
+ state.best_delta = 0;
+ state.leftspace = leftspace;
+ state.rightspace = rightspace;
+ state.olddataitemstotal = olddataitemstotal;
+ state.newitemoff = newitemoff;
+
+ /*
+ * Finding the best possible split would require checking all the possible
+ * split points, because of the high-key and left-key special cases.
+ * That's probably more work than it's worth; instead, stop as soon as we
+ * find a "good-enough" split, where good-enough is defined as an
+ * imbalance in free space of no more than pagesize/16 (arbitrary...) This
+ * should let us stop near the middle on most pages, instead of plowing to
+ * the end.
+ */
+ goodenough = leftspace / 16;
/*
- * Scan through the data items and calculate space usage for a split
- * at each possible position.
+ * Scan through the data items and calculate space usage for a split at
+ * each possible position.
*/
- dataitemstoleft = 0;
+ olddataitemstoleft = 0;
+ goodenoughfound = false;
maxoff = PageGetMaxOffsetNumber(page);
for (offnum = P_FIRSTDATAKEY(opaque);
offnum = OffsetNumberNext(offnum))
{
Size itemsz;
- int leftfree,
- rightfree;
itemid = PageGetItemId(page, offnum);
itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
- /*
- * We have to allow for the current item becoming the high key of
- * the left page; therefore it counts against left space as well
- * as right space.
- */
- leftfree = leftspace - dataitemstoleft - (int) itemsz;
- rightfree = rightspace - (dataitemtotal - dataitemstoleft);
-
/*
* Will the new item go to left or right of split?
*/
if (offnum > newitemoff)
- _bt_checksplitloc(&state, offnum, leftfree, rightfree,
- true, itemsz);
+ _bt_checksplitloc(&state, offnum, true,
+ olddataitemstoleft, itemsz);
+
else if (offnum < newitemoff)
- _bt_checksplitloc(&state, offnum, leftfree, rightfree,
- false, itemsz);
+ _bt_checksplitloc(&state, offnum, false,
+ olddataitemstoleft, itemsz);
else
{
/* need to try it both ways! */
- _bt_checksplitloc(&state, offnum, leftfree, rightfree,
- true, itemsz);
- /* here we are contemplating newitem as first on right */
- _bt_checksplitloc(&state, offnum, leftfree, rightfree,
- false, newitemsz);
+ _bt_checksplitloc(&state, offnum, true,
+ olddataitemstoleft, itemsz);
+
+ _bt_checksplitloc(&state, offnum, false,
+ olddataitemstoleft, itemsz);
}
/* Abort scan once we find a good-enough choice */
if (state.have_split && state.best_delta <= goodenough)
+ {
+ goodenoughfound = true;
break;
+ }
- dataitemstoleft += itemsz;
+ olddataitemstoleft += itemsz;
}
/*
- * I believe it is not possible to fail to find a feasible split, but
- * just in case ...
+ * If the new item goes as the last item, check for splitting so that all
+ * the old items go to the left page and the new item goes to the right
+ * page.
+ */
+ if (newitemoff > maxoff && !goodenoughfound)
+ _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
+
+ /*
+ * I believe it is not possible to fail to find a feasible split, but just
+ * in case ...
*/
if (!state.have_split)
- elog(ERROR, "could not find a feasible split point for \"%s\"",
+ elog(ERROR, "could not find a feasible split point for index \"%s\"",
RelationGetRelationName(rel));
*newitemonleft = state.newitemonleft;
/*
* Subroutine to analyze a particular possible split choice (ie, firstright
* and newitemonleft settings), and record the best split so far in *state.
+ *
+ * firstoldonright is the offset of the first item on the original page
+ * that goes to the right page, and firstoldonrightsz is the size of that
+ * tuple. firstoldonright can be > max offset, which means that all the old
+ * items go to the left page and only the new item goes to the right page.
+ * In that case, firstoldonrightsz is not used.
+ *
+ * olddataitemstoleft is the total size of all old items to the left of
+ * firstoldonright.
*/
static void
-_bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
- int leftfree, int rightfree,
- bool newitemonleft, Size firstrightitemsz)
+_bt_checksplitloc(FindSplitData *state,
+ OffsetNumber firstoldonright,
+ bool newitemonleft,
+ int olddataitemstoleft,
+ Size firstoldonrightsz)
{
+ int leftfree,
+ rightfree;
+ Size firstrightitemsz;
+ bool newitemisfirstonright;
+
+ /* Is the new item going to be the first item on the right page? */
+ newitemisfirstonright = (firstoldonright == state->newitemoff
+ && !newitemonleft);
+
+ if (newitemisfirstonright)
+ firstrightitemsz = state->newitemsz;
+ else
+ firstrightitemsz = firstoldonrightsz;
+
+ /* Account for all the old tuples */
+ leftfree = state->leftspace - olddataitemstoleft;
+ rightfree = state->rightspace -
+ (state->olddataitemstotal - olddataitemstoleft);
+
/*
- * Account for the new item on whichever side it is to be put.
+ * The first item on the right page becomes the high key of the left page;
+ * therefore it counts against left space as well as right space.
*/
+ leftfree -= firstrightitemsz;
+
+ /* account for the new item */
if (newitemonleft)
leftfree -= (int) state->newitemsz;
else
*/
if (!state->is_leaf)
rightfree += (int) firstrightitemsz -
- (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
+ (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
/*
* If feasible split point, remember best delta.
if (state->is_rightmost)
{
/*
- * On a rightmost page, try to equalize right free space with
- * twice the left free space. See comments for
- * _bt_findsplitloc.
+ * If splitting a rightmost page, try to put (100-fillfactor)% of
+ * free space on left page. See comments for _bt_findsplitloc.
*/
- delta = (2 * leftfree) - rightfree;
+ delta = (state->fillfactor * leftfree)
+ - ((100 - state->fillfactor) * rightfree);
}
else
{
{
state->have_split = true;
state->newitemonleft = newitemonleft;
- state->firstright = firstright;
+ state->firstright = firstoldonright;
state->best_delta = delta;
}
}
*
* On entry, buf and rbuf are the left and right split pages, which we
* still hold write locks on per the L&Y algorithm. We release the
- * write locks once we have write lock on the parent page. (Any sooner,
+ * write locks once we have write lock on the parent page. (Any sooner,
* and it'd be possible for some other process to try to split or delete
* one of these pages, and get confused because it cannot find the downlink.)
*
* have to be efficient (concurrent ROOT split, WAL recovery)
* is_root - we split the true root
* is_only - we split a page alone on its level (might have been fast root)
- *
- * This is exported so it can be called by nbtxlog.c.
*/
-void
+static void
_bt_insert_parent(Relation rel,
Buffer buf,
Buffer rbuf,
bool is_only)
{
/*
- * Here we have to do something Lehman and Yao don't talk about: deal
- * with a root split and construction of a new root. If our stack is
- * empty then we have just split a node on what had been the root
- * level when we descended the tree. If it was still the root then we
- * perform a new-root construction. If it *wasn't* the root anymore,
- * search to find the next higher level that someone constructed
- * meanwhile, and find the right place to insert as for the normal
- * case.
+ * Here we have to do something Lehman and Yao don't talk about: deal with
+ * a root split and construction of a new root. If our stack is empty
+ * then we have just split a node on what had been the root level when we
+ * descended the tree. If it was still the root then we perform a
+ * new-root construction. If it *wasn't* the root anymore, search to find
+ * the next higher level that someone constructed meanwhile, and find the
+ * right place to insert as for the normal case.
*
* If we have to search for the parent level, we do so by re-descending
- * from the root. This is not super-efficient, but it's rare enough
- * not to matter. (This path is also taken when called from WAL
- * recovery --- we have no stack in that case.)
+ * from the root. This is not super-efficient, but it's rare enough not
+ * to matter.
*/
if (is_root)
{
/* create a new root node and update the metapage */
rootbuf = _bt_newroot(rel, buf, rbuf);
/* release the split buffers */
- _bt_wrtbuf(rel, rootbuf);
- _bt_wrtbuf(rel, rbuf);
- _bt_wrtbuf(rel, buf);
+ _bt_relbuf(rel, rootbuf);
+ _bt_relbuf(rel, rbuf);
+ _bt_relbuf(rel, buf);
}
else
{
BlockNumber bknum = BufferGetBlockNumber(buf);
BlockNumber rbknum = BufferGetBlockNumber(rbuf);
Page page = BufferGetPage(buf);
- InsertIndexResult newres;
- BTItem new_item;
+ IndexTuple new_item;
BTStackData fakestack;
- BTItem ritem;
+ IndexTuple ritem;
Buffer pbuf;
if (stack == NULL)
{
BTPageOpaque lpageop;
- if (!InRecovery)
- elog(DEBUG2, "concurrent ROOT page split");
+ elog(DEBUG2, "concurrent ROOT page split");
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
/* Find the leftmost page at the next level up */
pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
stack = &fakestack;
stack->bts_blkno = BufferGetBlockNumber(pbuf);
stack->bts_offset = InvalidOffsetNumber;
- /* bts_btitem will be initialized below */
+ /* bts_btentry will be initialized below */
stack->bts_parent = NULL;
_bt_relbuf(rel, pbuf);
}
/* get high key from left page == lowest key on new right page */
- ritem = (BTItem) PageGetItem(page,
- PageGetItemId(page, P_HIKEY));
+ ritem = (IndexTuple) PageGetItem(page,
+ PageGetItemId(page, P_HIKEY));
/* form an index tuple that points at the new right page */
- new_item = _bt_formitem(&(ritem->bti_itup));
- ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
+ new_item = CopyIndexTuple(ritem);
+ ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
/*
* Find the parent buffer and get the parent page.
*
- * Oops - if we were moved right then we need to change stack item!
- * We want to find parent pointing to where we are, right ? -
- * vadim 05/27/97
+ * Oops - if we were moved right then we need to change stack item! We
+ * want to find parent pointing to where we are, right ? - vadim
+ * 05/27/97
*/
- ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
- bknum, P_HIKEY);
-
+ ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
- /* Now we can write and unlock the children */
- _bt_wrtbuf(rel, rbuf);
- _bt_wrtbuf(rel, buf);
+ /*
+ * Now we can unlock the right child. The left child will be unlocked
+ * by _bt_insertonpg().
+ */
+ _bt_relbuf(rel, rbuf);
/* Check for error only after writing children */
if (pbuf == InvalidBuffer)
- elog(ERROR, "failed to re-find parent key in \"%s\"",
- RelationGetRelationName(rel));
+ elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
+ RelationGetRelationName(rel), bknum, rbknum);
/* Recursively update the parent */
- newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
- 0, NULL, new_item, stack->bts_offset,
- is_only);
+ _bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
+ new_item, stack->bts_offset + 1,
+ is_only);
/* be tidy */
- pfree(newres);
pfree(new_item);
}
}
+/*
+ * _bt_finish_split() -- Finish an incomplete split
+ *
+ * A crash or other failure can leave a split incomplete. The insertion
+ * routines won't allow to insert on a page that is incompletely split.
+ * Before inserting on such a page, call _bt_finish_split().
+ *
+ * On entry, 'lbuf' must be locked in write-mode. On exit, it is unlocked
+ * and unpinned.
+ */
+void
+_bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
+{
+ Page lpage = BufferGetPage(lbuf);
+ BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
+ Buffer rbuf;
+ Page rpage;
+ BTPageOpaque rpageop;
+ bool was_root;
+ bool was_only;
+
+ Assert(P_INCOMPLETE_SPLIT(lpageop));
+
+ /* Lock right sibling, the one missing the downlink */
+ rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
+ rpage = BufferGetPage(rbuf);
+ rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
+
+ /* Could this be a root split? */
+ if (!stack)
+ {
+ Buffer metabuf;
+ Page metapg;
+ BTMetaPageData *metad;
+
+ /* acquire lock on the metapage */
+ metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
+ metapg = BufferGetPage(metabuf);
+ metad = BTPageGetMeta(metapg);
+
+ was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
+
+ _bt_relbuf(rel, metabuf);
+ }
+ else
+ was_root = false;
+
+ /* Was this the only page on the level before split? */
+ was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
+
+ elog(DEBUG1, "finishing incomplete split of %u/%u",
+ BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
+
+ _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
+}
+
/*
* _bt_getstackbuf() -- Walk back up the tree one step, and find the item
* we last looked at in the parent.
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
+ {
+ _bt_finish_split(rel, buf, stack->bts_parent);
+ continue;
+ }
+
if (!P_IGNORE(opaque))
{
OffsetNumber offnum,
minoff,
maxoff;
ItemId itemid;
- BTItem item;
+ IndexTuple item;
minoff = P_FIRSTDATAKEY(opaque);
maxoff = PageGetMaxOffsetNumber(page);
/*
- * start = InvalidOffsetNumber means "search the whole page".
- * We need this test anyway due to possibility that page has a
- * high key now when it didn't before.
+ * start = InvalidOffsetNumber means "search the whole page". We
+ * need this test anyway due to possibility that page has a high
+ * key now when it didn't before.
*/
if (start < minoff)
start = minoff;
/*
* These loops will check every item on the page --- but in an
- * order that's attuned to the probability of where it
- * actually is. Scan to the right first, then to the left.
+ * order that's attuned to the probability of where it actually
+ * is. Scan to the right first, then to the left.
*/
for (offnum = start;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
itemid = PageGetItemId(page, offnum);
- item = (BTItem) PageGetItem(page, itemid);
- if (BTItemSame(item, &stack->bts_btitem))
+ item = (IndexTuple) PageGetItem(page, itemid);
+ if (BTEntrySame(item, &stack->bts_btentry))
{
/* Return accurate pointer to where link is now */
stack->bts_blkno = blkno;
offnum = OffsetNumberPrev(offnum))
{
itemid = PageGetItemId(page, offnum);
- item = (BTItem) PageGetItem(page, itemid);
- if (BTItemSame(item, &stack->bts_btitem))
+ item = (IndexTuple) PageGetItem(page, itemid);
+ if (BTEntrySame(item, &stack->bts_btentry))
{
/* Return accurate pointer to where link is now */
stack->bts_blkno = blkno;
{
Buffer rootbuf;
Page lpage,
- rpage,
rootpage;
BlockNumber lbkno,
rbkno;
BlockNumber rootblknum;
BTPageOpaque rootopaque;
+ BTPageOpaque lopaque;
ItemId itemid;
- BTItem item;
- Size itemsz;
- BTItem new_item;
+ IndexTuple item;
+ IndexTuple left_item;
+ Size left_item_sz;
+ IndexTuple right_item;
+ Size right_item_sz;
Buffer metabuf;
Page metapg;
BTMetaPageData *metad;
lbkno = BufferGetBlockNumber(lbuf);
rbkno = BufferGetBlockNumber(rbuf);
lpage = BufferGetPage(lbuf);
- rpage = BufferGetPage(rbuf);
+ lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
/* get a new root page */
rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
metapg = BufferGetPage(metabuf);
metad = BTPageGetMeta(metapg);
+ /*
+ * Create downlink item for left page (old root). Since this will be the
+ * first item in a non-leaf page, it implicitly has minus-infinity key
+ * value, so we need not store any actual key in it.
+ */
+ left_item_sz = sizeof(IndexTupleData);
+ left_item = (IndexTuple) palloc(left_item_sz);
+ left_item->t_info = left_item_sz;
+ ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
+
+ /*
+ * Create downlink item for right page. The key for it is obtained from
+ * the "high key" position in the left page.
+ */
+ itemid = PageGetItemId(lpage, P_HIKEY);
+ right_item_sz = ItemIdGetLength(itemid);
+ item = (IndexTuple) PageGetItem(lpage, itemid);
+ right_item = CopyIndexTuple(item);
+ ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
+
/* NO EREPORT(ERROR) from here till newroot op is logged */
START_CRIT_SECTION();
rootopaque->btpo_flags = BTP_ROOT;
rootopaque->btpo.level =
((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
+ rootopaque->btpo_cycleid = 0;
/* update metapage data */
metad->btm_root = rootblknum;
metad->btm_fastlevel = rootopaque->btpo.level;
/*
- * Create downlink item for left page (old root). Since this will be
- * the first item in a non-leaf page, it implicitly has minus-infinity
- * key value, so we need not store any actual key in it.
+ * Insert the left page pointer into the new root page. The root page is
+ * the rightmost page on its level so there is no "high key" in it; the
+ * two items will go into positions P_HIKEY and P_FIRSTKEY.
+ *
+ * Note: we *must* insert the two items in item-number order, for the
+ * benefit of _bt_restore_page().
*/
- itemsz = sizeof(BTItemData);
- new_item = (BTItem) palloc(itemsz);
- new_item->bti_itup.t_info = itemsz;
- ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
+ if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "failed to add leftkey to new root page"
+ " while splitting block %u of index \"%s\"",
+ BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
/*
- * Insert the left page pointer into the new root page. The root page
- * is the rightmost page on its level so there is no "high key" in it;
- * the two items will go into positions P_HIKEY and P_FIRSTKEY.
+ * insert the right page pointer into the new root page.
*/
- if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add leftkey to new root page");
- pfree(new_item);
+ if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "failed to add rightkey to new root page"
+ " while splitting block %u of index \"%s\"",
+ BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
- /*
- * Create downlink item for right page. The key for it is obtained
- * from the "high key" position in the left page.
- */
- itemid = PageGetItemId(lpage, P_HIKEY);
- itemsz = ItemIdGetLength(itemid);
- item = (BTItem) PageGetItem(lpage, itemid);
- new_item = _bt_formitem(&(item->bti_itup));
- ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
+ /* Clear the incomplete-split flag in the left child */
+ Assert(P_INCOMPLETE_SPLIT(lopaque));
+ lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ MarkBufferDirty(lbuf);
- /*
- * insert the right page pointer into the new root page.
- */
- if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add rightkey to new root page");
- pfree(new_item);
+ MarkBufferDirty(rootbuf);
+ MarkBufferDirty(metabuf);
/* XLOG stuff */
- if (!rel->rd_istemp)
+ if (RelationNeedsWAL(rel))
{
xl_btree_newroot xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
+ xl_btree_metadata md;
- xlrec.node = rel->rd_node;
xlrec.rootblk = rootblknum;
xlrec.level = metad->btm_level;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeNewroot;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
+
+ XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
+ XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
+ XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+
+ md.root = rootblknum;
+ md.level = metad->btm_level;
+ md.fastroot = rootblknum;
+ md.fastlevel = metad->btm_level;
+
+ XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
/*
- * Direct access to page is not good but faster - we should
- * implement some new func in page API.
+ * Direct access to page is not good but faster - we should implement
+ * some new func in page API.
*/
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
- rdata[1].len = ((PageHeader) rootpage)->pd_special -
- ((PageHeader) rootpage)->pd_upper;
- rdata[1].next = NULL;
+ XLogRegisterBufData(0,
+ (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
+ ((PageHeader) rootpage)->pd_special -
+ ((PageHeader) rootpage)->pd_upper);
- recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
+ PageSetLSN(lpage, recptr);
PageSetLSN(rootpage, recptr);
- PageSetTLI(rootpage, ThisTimeLineID);
PageSetLSN(metapg, recptr);
- PageSetTLI(metapg, ThisTimeLineID);
- PageSetLSN(lpage, recptr);
- PageSetTLI(lpage, ThisTimeLineID);
- PageSetLSN(rpage, recptr);
- PageSetTLI(rpage, ThisTimeLineID);
}
END_CRIT_SECTION();
- /* write and let go of metapage buffer */
- _bt_wrtbuf(rel, metabuf);
+ /* done with metapage */
+ _bt_relbuf(rel, metabuf);
+
+ pfree(left_item);
+ pfree(right_item);
- return (rootbuf);
+ return rootbuf;
}
/*
* the buffer afterwards, either.
*
* The main difference between this routine and a bare PageAddItem call
- * is that this code knows that the leftmost data item on a non-leaf
+ * is that this code knows that the leftmost index tuple on a non-leaf
* btree page doesn't need to have a key. Therefore, it strips such
- * items down to just the item header. CAUTION: this works ONLY if
- * we insert the items in order, so that the given itup_off does
- * represent the final position of the item!
+ * tuples down to just the tuple header. CAUTION: this works ONLY if
+ * we insert the tuples in order, so that the given itup_off does
+ * represent the final position of the tuple!
*/
-static void
-_bt_pgaddtup(Relation rel,
- Page page,
+static bool
+_bt_pgaddtup(Page page,
Size itemsize,
- BTItem btitem,
- OffsetNumber itup_off,
- const char *where)
+ IndexTuple itup,
+ OffsetNumber itup_off)
{
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- BTItemData truncitem;
+ IndexTupleData trunctuple;
if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
{
- memcpy(&truncitem, btitem, sizeof(BTItemData));
- truncitem.bti_itup.t_info = sizeof(BTItemData);
- btitem = &truncitem;
- itemsize = sizeof(BTItemData);
+ trunctuple = *itup;
+ trunctuple.t_info = sizeof(IndexTupleData);
+ itup = &trunctuple;
+ itemsize = sizeof(IndexTupleData);
}
- if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
- LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add item to the %s for \"%s\"",
- where, RelationGetRelationName(rel));
+ if (PageAddItem(page, (Item) itup, itemsize, itup_off,
+ false, false) == InvalidOffsetNumber)
+ return false;
+
+ return true;
}
/*
_bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey)
{
- BTItem btitem;
IndexTuple itup;
int i;
/* Better be comparing to a leaf item */
Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
- btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
- itup = &(btitem->bti_itup);
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
for (i = 1; i <= keysz; i++)
{
if (isNull || (scankey->sk_flags & SK_ISNULL))
return false;
- result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
- datum,
- scankey->sk_argument));
+ result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
+ scankey->sk_collation,
+ datum,
+ scankey->sk_argument));
if (result != 0)
return false;
/* if we get here, the keys are equal */
return true;
}
+
+/*
+ * _bt_vacuum_one_page - vacuum just one index page.
+ *
+ * Try to remove LP_DEAD items from the given page. The passed buffer
+ * must be exclusive-locked, but unlike a real VACUUM, we don't need a
+ * super-exclusive "cleanup" lock (see nbtree/README).
+ */
+static void
+_bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
+{
+ OffsetNumber deletable[MaxOffsetNumber];
+ int ndeletable = 0;
+ OffsetNumber offnum,
+ minoff,
+ maxoff;
+ Page page = BufferGetPage(buffer);
+ BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ /*
+ * Scan over all items to see which ones need to be deleted according to
+ * LP_DEAD flags.
+ */
+ minoff = P_FIRSTDATAKEY(opaque);
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (offnum = minoff;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemId = PageGetItemId(page, offnum);
+
+ if (ItemIdIsDead(itemId))
+ deletable[ndeletable++] = offnum;
+ }
+
+ if (ndeletable > 0)
+ _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
+
+ /*
+ * Note: if we didn't find any LP_DEAD items, then the page's
+ * BTP_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
+ * separate write to clear it, however. We will clear it when we split
+ * the page.
+ */
+}