]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/nbtree/nbtinsert.c
Update copyright for 2016
[postgresql] / src / backend / access / nbtree / nbtinsert.c
index dd3736ad8a99ccba69fe56094a3d81af9fa04517..e3c55eb6c474314e43d9db7c5bc5afe7564a3f50 100644 (file)
@@ -3,12 +3,12 @@
  * nbtinsert.c
  *       Item insertion in Lehman and Yao btrees for Postgres.
  *
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.173 2009/08/01 20:59:17 tgl Exp $
+ *       src/backend/access/nbtree/nbtinsert.c
  *
  *-------------------------------------------------------------------------
  */
 #include "access/heapam.h"
 #include "access/nbtree.h"
 #include "access/transam.h"
+#include "access/xloginsert.h"
 #include "miscadmin.h"
-#include "storage/bufmgr.h"
 #include "storage/lmgr.h"
-#include "utils/inval.h"
+#include "storage/predicate.h"
 #include "utils/tqual.h"
 
 
@@ -51,21 +51,26 @@ static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
                                 Relation heapRel, Buffer buf, OffsetNumber offset,
                                 ScanKey itup_scankey,
-                                IndexUniqueCheck checkUnique, bool *is_unique);
+                                IndexUniqueCheck checkUnique, bool *is_unique,
+                                uint32 *speculativeToken);
 static void _bt_findinsertloc(Relation rel,
                                  Buffer *bufptr,
                                  OffsetNumber *offsetptr,
                                  int keysz,
                                  ScanKey scankey,
-                                 IndexTuple newtup);
-static void _bt_insertonpg(Relation rel, Buffer buf,
+                                 IndexTuple newtup,
+                                 BTStack stack,
+                                 Relation heapRel);
+static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
                           BTStack stack,
                           IndexTuple itup,
                           OffsetNumber newitemoff,
                           bool split_only_page);
-static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
-                 OffsetNumber newitemoff, Size newitemsz,
+static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
+                 OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
                  IndexTuple newitem, bool newitemonleft);
+static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
+                                 BTStack stack, bool is_root, bool is_only);
 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
                                 OffsetNumber newitemoff,
                                 Size newitemsz,
@@ -73,19 +78,18 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
 static void _bt_checksplitloc(FindSplitData *state,
                                  OffsetNumber firstoldonright, bool newitemonleft,
                                  int dataitemstoleft, Size firstoldonrightsz);
-static void _bt_pgaddtup(Relation rel, Page page,
-                        Size itemsize, IndexTuple itup,
-                        OffsetNumber itup_off, const char *where);
+static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
+                        OffsetNumber itup_off);
 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
                        int keysz, ScanKey scankey);
-static void _bt_vacuum_one_page(Relation rel, Buffer buffer);
+static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
 
 
 /*
  *     _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
  *
- *             This routine is called by the public interface routines, btbuild
- *             and btinsert.  By here, itup is filled in, including the TID.
+ *             This routine is called by the public interface routine, btinsert.
+ *             By here, itup is filled in, including the TID.
  *
  *             If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
  *             will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
@@ -127,10 +131,11 @@ top:
         * If the page was split between the time that we surrendered our read
         * lock and acquired our write lock, then this page may no longer be the
         * right place for the key we want to insert.  In this case, we need to
-        * move right in the tree.      See Lehman and Yao for an excruciatingly
+        * move right in the tree.  See Lehman and Yao for an excruciatingly
         * precise description.
         */
-       buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
+       buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+                                               true, stack, BT_WRITE);
 
        /*
         * If we're not allowing duplicates, make sure the key isn't already in
@@ -149,23 +154,34 @@ top:
         * If we must wait for another xact, we release the lock while waiting,
         * and then must start over completely.
         *
-        * For a partial uniqueness check, we don't wait for the other xact.
-        * Just let the tuple in and return false for possibly non-unique,
-        * or true for definitely unique.
+        * For a partial uniqueness check, we don't wait for the other xact. Just
+        * let the tuple in and return false for possibly non-unique, or true for
+        * definitely unique.
         */
        if (checkUnique != UNIQUE_CHECK_NO)
        {
                TransactionId xwait;
+               uint32          speculativeToken;
 
                offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
                xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
-                                                                checkUnique, &is_unique);
+                                                                checkUnique, &is_unique, &speculativeToken);
 
                if (TransactionIdIsValid(xwait))
                {
                        /* Have to wait for the other guy ... */
                        _bt_relbuf(rel, buf);
-                       XactLockTableWait(xwait);
+
+                       /*
+                        * If it's a speculative insertion, wait for it to finish (ie. to
+                        * go ahead with the insertion, or kill the tuple).  Otherwise
+                        * wait for the transaction to finish as usual.
+                        */
+                       if (speculativeToken)
+                               SpeculativeInsertionWait(xwait, speculativeToken);
+                       else
+                               XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
+
                        /* start over... */
                        _bt_freestack(stack);
                        goto top;
@@ -174,9 +190,18 @@ top:
 
        if (checkUnique != UNIQUE_CHECK_EXISTING)
        {
+               /*
+                * The only conflict predicate locking cares about for indexes is when
+                * an index tuple insert conflicts with an existing lock.  Since the
+                * actual location of the insert is hard to predict because of the
+                * random search used to prevent O(N^2) performance when there are
+                * many duplicate entries, we can just use the "first valid" page.
+                */
+               CheckForSerializableConflictIn(rel, NULL, buf);
                /* do the insertion */
-               _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup);
-               _bt_insertonpg(rel, buf, stack, itup, offset, false);
+               _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+                                                 stack, heapRel);
+               _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
        }
        else
        {
@@ -199,8 +224,11 @@ top:
  * is the first tuple on the next page.
  *
  * Returns InvalidTransactionId if there is no conflict, else an xact ID
- * we must wait for to see if it commits a conflicting tuple.  If an actual
- * conflict is detected, no return --- just ereport().
+ * we must wait for to see if it commits a conflicting tuple.   If an actual
+ * conflict is detected, no return --- just ereport().  If an xact ID is
+ * returned, and the conflicting tuple still has a speculative insertion in
+ * progress, *speculativeToken is set to non-zero, and the caller can wait for
+ * the verdict on the insertion using SpeculativeInsertionWait().
  *
  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
  * InvalidTransactionId because we don't want to wait.  In this case we
@@ -210,7 +238,8 @@ top:
 static TransactionId
 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                                 Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
-                                IndexUniqueCheck checkUnique, bool *is_unique)
+                                IndexUniqueCheck checkUnique, bool *is_unique,
+                                uint32 *speculativeToken)
 {
        TupleDesc       itupdesc = RelationGetDescr(rel);
        int                     natts = rel->rd_rel->relnatts;
@@ -302,10 +331,10 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 
                                        /*
                                         * It is a duplicate. If we are only doing a partial
-                                        * check, then don't bother checking if the tuple is
-                                        * being updated in another transaction. Just return
-                                        * the fact that it is a potential conflict and leave
-                                        * the full check till later.
+                                        * check, then don't bother checking if the tuple is being
+                                        * updated in another transaction. Just return the fact
+                                        * that it is a potential conflict and leave the full
+                                        * check till later.
                                         */
                                        if (checkUnique == UNIQUE_CHECK_PARTIAL)
                                        {
@@ -327,6 +356,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                                                if (nbuf != InvalidBuffer)
                                                        _bt_relbuf(rel, nbuf);
                                                /* Tell _bt_doinsert to wait... */
+                                               *speculativeToken = SnapshotDirty.speculativeToken;
                                                return xwait;
                                        }
 
@@ -362,30 +392,36 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                                        }
 
                                        /*
-                                        * This is a definite conflict.  Break the tuple down
-                                        * into datums and report the error.  But first, make
-                                        * sure we release the buffer locks we're holding ---
+                                        * This is a definite conflict.  Break the tuple down into
+                                        * datums and report the error.  But first, make sure we
+                                        * release the buffer locks we're holding ---
                                         * BuildIndexValueDescription could make catalog accesses,
-                                        * which in the worst case might touch this same index
-                                        * and cause deadlocks.
+                                        * which in the worst case might touch this same index and
+                                        * cause deadlocks.
                                         */
                                        if (nbuf != InvalidBuffer)
                                                _bt_relbuf(rel, nbuf);
                                        _bt_relbuf(rel, buf);
 
                                        {
-                                               Datum   values[INDEX_MAX_KEYS];
-                                               bool    isnull[INDEX_MAX_KEYS];
+                                               Datum           values[INDEX_MAX_KEYS];
+                                               bool            isnull[INDEX_MAX_KEYS];
+                                               char       *key_desc;
 
                                                index_deform_tuple(itup, RelationGetDescr(rel),
                                                                                   values, isnull);
+
+                                               key_desc = BuildIndexValueDescription(rel, values,
+                                                                                                                         isnull);
+
                                                ereport(ERROR,
                                                                (errcode(ERRCODE_UNIQUE_VIOLATION),
                                                                 errmsg("duplicate key value violates unique constraint \"%s\"",
                                                                                RelationGetRelationName(rel)),
-                                                                errdetail("Key %s already exists.",
-                                                                                  BuildIndexValueDescription(rel,
-                                                                                                                       values, isnull))));
+                                                          key_desc ? errdetail("Key %s already exists.",
+                                                                                                       key_desc) : 0,
+                                                                errtableconstraint(heapRel,
+                                                                                        RelationGetRelationName(rel))));
                                        }
                                }
                                else if (all_dead)
@@ -397,11 +433,15 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                                         */
                                        ItemIdMarkDead(curitemid);
                                        opaque->btpo_flags |= BTP_HAS_GARBAGE;
-                                       /* be sure to mark the proper buffer dirty... */
+
+                                       /*
+                                        * Mark buffer with a dirty hint, since state is not
+                                        * crucial. Be sure to mark the proper buffer dirty.
+                                        */
                                        if (nbuf != InvalidBuffer)
-                                               SetBufferCommitInfoNeedsSave(nbuf);
+                                               MarkBufferDirtyHint(nbuf, true);
                                        else
-                                               SetBufferCommitInfoNeedsSave(buf);
+                                               MarkBufferDirtyHint(buf, true);
                                }
                        }
                }
@@ -438,16 +478,18 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
        }
 
        /*
-        * If we are doing a recheck then we should have found the tuple we
-        * are checking.  Otherwise there's something very wrong --- probably,
-        * the index is on a non-immutable expression.
+        * If we are doing a recheck then we should have found the tuple we are
+        * checking.  Otherwise there's something very wrong --- probably, the
+        * index is on a non-immutable expression.
         */
        if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
                ereport(ERROR,
                                (errcode(ERRCODE_INTERNAL_ERROR),
                                 errmsg("failed to re-find tuple within index \"%s\"",
                                                RelationGetRelationName(rel)),
-                                errhint("This may be because of a non-immutable index expression.")));
+                errhint("This may be because of a non-immutable index expression."),
+                                errtableconstraint(heapRel,
+                                                                       RelationGetRelationName(rel))));
 
        if (nbuf != InvalidBuffer)
                _bt_relbuf(rel, nbuf);
@@ -462,7 +504,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
  *             If the new key is equal to one or more existing keys, we can
  *             legitimately place it anywhere in the series of equal keys --- in fact,
  *             if the new key is equal to the page's "high key" we can place it on
- *             the next page.  If it is equal to the high key, and there's not room
+ *             the next page.  If it is equal to the high key, and there's not room
  *             to insert the new tuple on the current page without splitting, then
  *             we can move right hoping to find more free space and avoid a split.
  *             (We should not move right indefinitely, however, since that leads to
@@ -473,9 +515,9 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
  *             If there's not enough room in the space, we try to make room by
  *             removing any LP_DEAD tuples.
  *
- *             On entry, *buf and *offsetptr point to the first legal position
- *             where the new tuple could be inserted.  The caller should hold an
- *             exclusive lock on *buf.  *offsetptr can also be set to
+ *             On entry, *bufptr and *offsetptr point to the first legal position
+ *             where the new tuple could be inserted.  The caller should hold an
+ *             exclusive lock on *bufptr.  *offsetptr can also be set to
  *             InvalidOffsetNumber, in which case the function will search for the
  *             right location within the page if needed.  On exit, they point to the
  *             chosen insert location.  If _bt_findinsertloc decides to move right,
@@ -491,7 +533,9 @@ _bt_findinsertloc(Relation rel,
                                  OffsetNumber *offsetptr,
                                  int keysz,
                                  ScanKey scankey,
-                                 IndexTuple newtup)
+                                 IndexTuple newtup,
+                                 BTStack stack,
+                                 Relation heapRel)
 {
        Buffer          buf = *bufptr;
        Page            page = BufferGetPage(buf);
@@ -514,16 +558,20 @@ _bt_findinsertloc(Relation rel,
         * able to fit three items on every page, so restrict any one item to 1/3
         * the per-page available space. Note that at this point, itemsz doesn't
         * include the ItemId.
+        *
+        * NOTE: if you change this, see also the similar code in _bt_buildadd().
         */
        if (itemsz > BTMaxItemSize(page))
                ereport(ERROR,
                                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-                                errmsg("index row size %lu exceeds btree maximum, %lu",
-                                               (unsigned long) itemsz,
-                                               (unsigned long) BTMaxItemSize(page)),
+                       errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
+                                  itemsz, BTMaxItemSize(page),
+                                  RelationGetRelationName(rel)),
                errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
                                "Consider a function index of an MD5 hash of the value, "
-                               "or use full text indexing.")));
+                               "or use full text indexing."),
+                                errtableconstraint(heapRel,
+                                                                       RelationGetRelationName(rel))));
 
        /*----------
         * If we will need to split the page to put the item on this page,
@@ -538,7 +586,7 @@ _bt_findinsertloc(Relation rel,
         * on every insert.  We implement "get tired" as a random choice,
         * since stopping after scanning a fixed number of pages wouldn't work
         * well (we'd never reach the right-hand side of previously split
-        * pages).      Currently the probability of moving right is set at 0.99,
+        * pages).  Currently the probability of moving right is set at 0.99,
         * which may seem too high to change the behavior much, but it does an
         * excellent job of preventing O(N^2) behavior with many equal keys.
         *----------
@@ -548,6 +596,7 @@ _bt_findinsertloc(Relation rel,
        while (PageGetFreeSpace(page) < itemsz)
        {
                Buffer          rbuf;
+               BlockNumber rblkno;
 
                /*
                 * before considering moving right, see if we can obtain enough space
@@ -555,7 +604,7 @@ _bt_findinsertloc(Relation rel,
                 */
                if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
                {
-                       _bt_vacuum_one_page(rel, buf);
+                       _bt_vacuum_one_page(rel, buf, heapRel);
 
                        /*
                         * remember that we vacuumed this page, because that makes the
@@ -585,18 +634,33 @@ _bt_findinsertloc(Relation rel,
                 */
                rbuf = InvalidBuffer;
 
+               rblkno = lpageop->btpo_next;
                for (;;)
                {
-                       BlockNumber rblkno = lpageop->btpo_next;
-
                        rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
                        page = BufferGetPage(rbuf);
                        lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+                       /*
+                        * If this page was incompletely split, finish the split now. We
+                        * do this while holding a lock on the left sibling, which is not
+                        * good because finishing the split could be a fairly lengthy
+                        * operation.  But this should happen very seldom.
+                        */
+                       if (P_INCOMPLETE_SPLIT(lpageop))
+                       {
+                               _bt_finish_split(rel, rbuf, stack);
+                               rbuf = InvalidBuffer;
+                               continue;
+                       }
+
                        if (!P_IGNORE(lpageop))
                                break;
                        if (P_RIGHTMOST(lpageop))
                                elog(ERROR, "fell off the end of index \"%s\"",
                                         RelationGetRelationName(rel));
+
+                       rblkno = lpageop->btpo_next;
                }
                _bt_relbuf(rel, buf);
                buf = rbuf;
@@ -638,10 +702,14 @@ _bt_findinsertloc(Relation rel,
  *                        child page on the parent.
  *                     +  updates the metapage if a true root or fast root is split.
  *
- *             On entry, we must have the right buffer in which to do the
- *             insertion, and the buffer must be pinned and write-locked.      On return,
+ *             On entry, we must have the correct buffer in which to do the
+ *             insertion, and the buffer must be pinned and write-locked.  On return,
  *             we will have dropped both the pin and the lock on the buffer.
  *
+ *             When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
+ *             page we're inserting the downlink for.  This function will clear the
+ *             INCOMPLETE_SPLIT flag on it, and release the buffer.
+ *
  *             The locking interactions in this code are critical.  You should
  *             grok Lehman and Yao's paper before making any changes.  In addition,
  *             you need to understand how we disambiguate duplicate keys in this
@@ -655,6 +723,7 @@ _bt_findinsertloc(Relation rel,
 static void
 _bt_insertonpg(Relation rel,
                           Buffer buf,
+                          Buffer cbuf,
                           BTStack stack,
                           IndexTuple itup,
                           OffsetNumber newitemoff,
@@ -668,6 +737,14 @@ _bt_insertonpg(Relation rel,
        page = BufferGetPage(buf);
        lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
 
+       /* child buffer must be given iff inserting on an internal page */
+       Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
+
+       /* The caller should've finished any incomplete splits already. */
+       if (P_INCOMPLETE_SPLIT(lpageop))
+               elog(ERROR, "cannot insert to incompletely split page %u",
+                        BufferGetBlockNumber(buf));
+
        itemsz = IndexTupleDSize(*itup);
        itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
                                                                 * need to be consistent */
@@ -692,8 +769,11 @@ _bt_insertonpg(Relation rel,
                                                                          &newitemonleft);
 
                /* split the buffer into left and right halves */
-               rbuf = _bt_split(rel, buf, firstright,
+               rbuf = _bt_split(rel, buf, cbuf, firstright,
                                                 newitemoff, itemsz, itup, newitemonleft);
+               PredicateLockPageSplit(rel,
+                                                          BufferGetBlockNumber(buf),
+                                                          BufferGetBlockNumber(rbuf));
 
                /*----------
                 * By here,
@@ -750,7 +830,9 @@ _bt_insertonpg(Relation rel,
                /* Do the update.  No ereport(ERROR) until changes are logged */
                START_CRIT_SECTION();
 
-               _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
+               if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
+                       elog(PANIC, "failed to add new item to block %u in index \"%s\"",
+                                itup_blkno, RelationGetRelationName(rel));
 
                MarkBufferDirty(buf);
 
@@ -761,38 +843,40 @@ _bt_insertonpg(Relation rel,
                        MarkBufferDirty(metabuf);
                }
 
+               /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
+               if (BufferIsValid(cbuf))
+               {
+                       Page            cpage = BufferGetPage(cbuf);
+                       BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+
+                       Assert(P_INCOMPLETE_SPLIT(cpageop));
+                       cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+                       MarkBufferDirty(cbuf);
+               }
+
                /* XLOG stuff */
-               if (!rel->rd_istemp)
+               if (RelationNeedsWAL(rel))
                {
                        xl_btree_insert xlrec;
-                       BlockNumber xldownlink;
                        xl_btree_metadata xlmeta;
                        uint8           xlinfo;
                        XLogRecPtr      recptr;
-                       XLogRecData rdata[4];
-                       XLogRecData *nextrdata;
                        IndexTupleData trunctuple;
 
-                       xlrec.target.node = rel->rd_node;
-                       ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
+                       xlrec.offnum = itup_off;
 
-                       rdata[0].data = (char *) &xlrec;
-                       rdata[0].len = SizeOfBtreeInsert;
-                       rdata[0].buffer = InvalidBuffer;
-                       rdata[0].next = nextrdata = &(rdata[1]);
+                       XLogBeginInsert();
+                       XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
 
                        if (P_ISLEAF(lpageop))
                                xlinfo = XLOG_BTREE_INSERT_LEAF;
                        else
                        {
-                               xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
-                               Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
-
-                               nextrdata->data = (char *) &xldownlink;
-                               nextrdata->len = sizeof(BlockNumber);
-                               nextrdata->buffer = InvalidBuffer;
-                               nextrdata->next = nextrdata + 1;
-                               nextrdata++;
+                               /*
+                                * Register the left child whose INCOMPLETE_SPLIT flag was
+                                * cleared.
+                                */
+                               XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
 
                                xlinfo = XLOG_BTREE_INSERT_UPPER;
                        }
@@ -804,54 +888,45 @@ _bt_insertonpg(Relation rel,
                                xlmeta.fastroot = metad->btm_fastroot;
                                xlmeta.fastlevel = metad->btm_fastlevel;
 
-                               nextrdata->data = (char *) &xlmeta;
-                               nextrdata->len = sizeof(xl_btree_metadata);
-                               nextrdata->buffer = InvalidBuffer;
-                               nextrdata->next = nextrdata + 1;
-                               nextrdata++;
+                               XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+                               XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
 
                                xlinfo = XLOG_BTREE_INSERT_META;
                        }
 
                        /* Read comments in _bt_pgaddtup */
+                       XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
                        if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
                        {
                                trunctuple = *itup;
                                trunctuple.t_info = sizeof(IndexTupleData);
-                               nextrdata->data = (char *) &trunctuple;
-                               nextrdata->len = sizeof(IndexTupleData);
+                               XLogRegisterBufData(0, (char *) &trunctuple,
+                                                                       sizeof(IndexTupleData));
                        }
                        else
-                       {
-                               nextrdata->data = (char *) itup;
-                               nextrdata->len = IndexTupleDSize(*itup);
-                       }
-                       nextrdata->buffer = buf;
-                       nextrdata->buffer_std = true;
-                       nextrdata->next = NULL;
+                               XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
 
-                       recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+                       recptr = XLogInsert(RM_BTREE_ID, xlinfo);
 
                        if (BufferIsValid(metabuf))
                        {
                                PageSetLSN(metapg, recptr);
-                               PageSetTLI(metapg, ThisTimeLineID);
+                       }
+                       if (BufferIsValid(cbuf))
+                       {
+                               PageSetLSN(BufferGetPage(cbuf), recptr);
                        }
 
                        PageSetLSN(page, recptr);
-                       PageSetTLI(page, ThisTimeLineID);
                }
 
                END_CRIT_SECTION();
 
-               /* release buffers; send out relcache inval if metapage changed */
+               /* release buffers */
                if (BufferIsValid(metabuf))
-               {
-                       if (!InRecovery)
-                               CacheInvalidateRelcache(rel);
                        _bt_relbuf(rel, metabuf);
-               }
-
+               if (BufferIsValid(cbuf))
+                       _bt_relbuf(rel, cbuf);
                _bt_relbuf(rel, buf);
        }
 }
@@ -864,11 +939,15 @@ _bt_insertonpg(Relation rel,
  *             new right page.  newitemoff etc. tell us about the new item that
  *             must be inserted along with the data from the old page.
  *
+ *             When splitting a non-leaf page, 'cbuf' is the left-sibling of the
+ *             page we're inserting the downlink for.  This function will clear the
+ *             INCOMPLETE_SPLIT flag on it, and release the buffer.
+ *
  *             Returns the new right sibling of buf, pinned and write-locked.
  *             The pin and lock on buf are maintained.
  */
 static Buffer
-_bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
+_bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
                  OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
                  bool newitemonleft)
 {
@@ -876,6 +955,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
        Page            origpage;
        Page            leftpage,
                                rightpage;
+       BlockNumber origpagenumber,
+                               rightpagenumber;
        BTPageOpaque ropaque,
                                lopaque,
                                oopaque;
@@ -890,22 +971,38 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
        OffsetNumber maxoff;
        OffsetNumber i;
        bool            isroot;
+       bool            isleaf;
 
+       /* Acquire a new page to split into */
        rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
+
+       /*
+        * origpage is the original page to be split.  leftpage is a temporary
+        * buffer that receives the left-sibling data, which will be copied back
+        * into origpage on success.  rightpage is the new page that receives the
+        * right-sibling data.  If we fail before reaching the critical section,
+        * origpage hasn't been modified and leftpage is only workspace. In
+        * principle we shouldn't need to worry about rightpage either, because it
+        * hasn't been linked into the btree page structure; but to avoid leaving
+        * possibly-confusing junk behind, we are careful to rewrite rightpage as
+        * zeroes before throwing any error.
+        */
        origpage = BufferGetPage(buf);
        leftpage = PageGetTempPage(origpage);
        rightpage = BufferGetPage(rbuf);
 
+       origpagenumber = BufferGetBlockNumber(buf);
+       rightpagenumber = BufferGetBlockNumber(rbuf);
+
        _bt_pageinit(leftpage, BufferGetPageSize(buf));
        /* rightpage was already initialized by _bt_getbuf */
 
        /*
-        * Copy the original page's LSN and TLI into leftpage, which will become
-        * the updated version of the page.  We need this because XLogInsert will
-        * examine these fields and possibly dump them in a page image.
+        * Copy the original page's LSN into leftpage, which will become the
+        * updated version of the page.  We need this because XLogInsert will
+        * examine the LSN and possibly dump it in a page image.
         */
        PageSetLSN(leftpage, PageGetLSN(origpage));
-       PageSetTLI(leftpage, PageGetTLI(origpage));
 
        /* init btree private data */
        oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
@@ -913,15 +1010,18 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
        ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
 
        isroot = P_ISROOT(oopaque);
+       isleaf = P_ISLEAF(oopaque);
 
        /* if we're splitting this page, it won't be the root when we're done */
        /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
        lopaque->btpo_flags = oopaque->btpo_flags;
        lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
        ropaque->btpo_flags = lopaque->btpo_flags;
+       /* set flag in left page indicating that the right page has no downlink */
+       lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
        lopaque->btpo_prev = oopaque->btpo_prev;
-       lopaque->btpo_next = BufferGetBlockNumber(rbuf);
-       ropaque->btpo_prev = BufferGetBlockNumber(buf);
+       lopaque->btpo_next = rightpagenumber;
+       ropaque->btpo_prev = origpagenumber;
        ropaque->btpo_next = oopaque->btpo_next;
        lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
        /* Since we already have write-lock on both pages, ok to read cycleid */
@@ -944,9 +1044,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                item = (IndexTuple) PageGetItem(origpage, itemid);
                if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
                                                false, false) == InvalidOffsetNumber)
-                       elog(PANIC, "failed to add hikey to the right sibling"
+               {
+                       memset(rightpage, 0, BufferGetPageSize(rbuf));
+                       elog(ERROR, "failed to add hikey to the right sibling"
                                 " while splitting block %u of index \"%s\"",
-                                BufferGetBlockNumber(buf), RelationGetRelationName(rel));
+                                origpagenumber, RelationGetRelationName(rel));
+               }
                rightoff = OffsetNumberNext(rightoff);
        }
 
@@ -971,9 +1074,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
        }
        if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
                                        false, false) == InvalidOffsetNumber)
-               elog(PANIC, "failed to add hikey to the left sibling"
+       {
+               memset(rightpage, 0, BufferGetPageSize(rbuf));
+               elog(ERROR, "failed to add hikey to the left sibling"
                         " while splitting block %u of index \"%s\"",
-                        BufferGetBlockNumber(buf), RelationGetRelationName(rel));
+                        origpagenumber, RelationGetRelationName(rel));
+       }
        leftoff = OffsetNumberNext(leftoff);
 
        /*
@@ -995,14 +1101,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                {
                        if (newitemonleft)
                        {
-                               _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
-                                                        "left sibling");
+                               if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
+                               {
+                                       memset(rightpage, 0, BufferGetPageSize(rbuf));
+                                       elog(ERROR, "failed to add new item to the left sibling"
+                                                " while splitting block %u of index \"%s\"",
+                                                origpagenumber, RelationGetRelationName(rel));
+                               }
                                leftoff = OffsetNumberNext(leftoff);
                        }
                        else
                        {
-                               _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
-                                                        "right sibling");
+                               if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
+                               {
+                                       memset(rightpage, 0, BufferGetPageSize(rbuf));
+                                       elog(ERROR, "failed to add new item to the right sibling"
+                                                " while splitting block %u of index \"%s\"",
+                                                origpagenumber, RelationGetRelationName(rel));
+                               }
                                rightoff = OffsetNumberNext(rightoff);
                        }
                }
@@ -1010,14 +1126,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                /* decide which page to put it on */
                if (i < firstright)
                {
-                       _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
-                                                "left sibling");
+                       if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
+                       {
+                               memset(rightpage, 0, BufferGetPageSize(rbuf));
+                               elog(ERROR, "failed to add old item to the left sibling"
+                                        " while splitting block %u of index \"%s\"",
+                                        origpagenumber, RelationGetRelationName(rel));
+                       }
                        leftoff = OffsetNumberNext(leftoff);
                }
                else
                {
-                       _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
-                                                "right sibling");
+                       if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
+                       {
+                               memset(rightpage, 0, BufferGetPageSize(rbuf));
+                               elog(ERROR, "failed to add old item to the right sibling"
+                                        " while splitting block %u of index \"%s\"",
+                                        origpagenumber, RelationGetRelationName(rel));
+                       }
                        rightoff = OffsetNumberNext(rightoff);
                }
        }
@@ -1031,8 +1157,13 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                 * not be splitting the page).
                 */
                Assert(!newitemonleft);
-               _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
-                                        "right sibling");
+               if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
+               {
+                       memset(rightpage, 0, BufferGetPageSize(rbuf));
+                       elog(ERROR, "failed to add new item to the right sibling"
+                                " while splitting block %u of index \"%s\"",
+                                origpagenumber, RelationGetRelationName(rel));
+               }
                rightoff = OffsetNumberNext(rightoff);
        }
 
@@ -1044,16 +1175,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
         * neighbors.
         */
 
-       if (!P_RIGHTMOST(ropaque))
+       if (!P_RIGHTMOST(oopaque))
        {
-               sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
+               sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
                spage = BufferGetPage(sbuf);
                sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
-               if (sopaque->btpo_prev != ropaque->btpo_prev)
-                       elog(PANIC, "right sibling's left-link doesn't match: "
+               if (sopaque->btpo_prev != origpagenumber)
+               {
+                       memset(rightpage, 0, BufferGetPageSize(rbuf));
+                       elog(ERROR, "right sibling's left-link doesn't match: "
                           "block %u links to %u instead of expected %u in index \"%s\"",
-                                ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
+                                oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
                                 RelationGetRelationName(rel));
+               }
 
                /*
                 * Check to see if we can set the SPLIT_END flag in the right-hand
@@ -1064,7 +1198,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                 * page.  If you're confused, imagine that page A splits to A B and
                 * then again, yielding A C B, while vacuum is in progress.  Tuples
                 * originally in A could now be in either B or C, hence vacuum must
-                * examine both pages.  But if D, our right sibling, has a different
+                * examine both pages.  But if D, our right sibling, has a different
                 * cycleid then it could not contain any tuples that were in A when
                 * the vacuum started.
                 */
@@ -1078,8 +1212,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
         *
         * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
         * not starting the critical section till here because we haven't been
-        * scribbling on the original page yet, and we don't care about the new
-        * sibling until it's linked into the btree.
+        * scribbling on the original page yet; see comments above.
         */
        START_CRIT_SECTION();
 
@@ -1091,54 +1224,74 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
         * (in the page management code) that the center of a page always be
         * clean, and the most efficient way to guarantee this is just to compact
         * the data by reinserting it into a new left page.  (XXX the latter
-        * comment is probably obsolete.)
+        * comment is probably obsolete; but in any case it's good to not scribble
+        * on the original page until we enter the critical section.)
         *
         * We need to do this before writing the WAL record, so that XLogInsert
         * can WAL log an image of the page if necessary.
         */
        PageRestoreTempPage(leftpage, origpage);
+       /* leftpage, lopaque must not be used below here */
 
        MarkBufferDirty(buf);
        MarkBufferDirty(rbuf);
 
        if (!P_RIGHTMOST(ropaque))
        {
-               sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
+               sopaque->btpo_prev = rightpagenumber;
                MarkBufferDirty(sbuf);
        }
 
+       /*
+        * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
+        * a split.
+        */
+       if (!isleaf)
+       {
+               Page            cpage = BufferGetPage(cbuf);
+               BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+
+               cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+               MarkBufferDirty(cbuf);
+       }
+
        /* XLOG stuff */
-       if (!rel->rd_istemp)
+       if (RelationNeedsWAL(rel))
        {
                xl_btree_split xlrec;
                uint8           xlinfo;
                XLogRecPtr      recptr;
-               XLogRecData rdata[7];
-               XLogRecData *lastrdata;
 
-               xlrec.node = rel->rd_node;
-               xlrec.leftsib = BufferGetBlockNumber(buf);
-               xlrec.rightsib = BufferGetBlockNumber(rbuf);
-               xlrec.rnext = ropaque->btpo_next;
                xlrec.level = ropaque->btpo.level;
                xlrec.firstright = firstright;
+               xlrec.newitemoff = newitemoff;
 
-               rdata[0].data = (char *) &xlrec;
-               rdata[0].len = SizeOfBtreeSplit;
-               rdata[0].buffer = InvalidBuffer;
+               XLogBeginInsert();
+               XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
 
-               lastrdata = &rdata[0];
-
-               if (ropaque->btpo.level > 0)
-               {
-                       /* Log downlink on non-leaf pages */
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
+               XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+               XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
+               /* Log the right sibling, because we've changed its prev-pointer. */
+               if (!P_RIGHTMOST(ropaque))
+                       XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
+               if (BufferIsValid(cbuf))
+                       XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
 
-                       lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
-                       lastrdata->len = sizeof(BlockIdData);
-                       lastrdata->buffer = InvalidBuffer;
+               /*
+                * Log the new item, if it was inserted on the left page. (If it was
+                * put on the right page, we don't need to explicitly WAL log it
+                * because it's included with all the other items on the right page.)
+                * Show the new item as belonging to the left page buffer, so that it
+                * is not stored if XLogInsert decides it needs a full-page image of
+                * the left page.  We store the offset anyway, though, to support
+                * archive compression of these records.
+                */
+               if (newitemonleft)
+                       XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
 
+               /* Log left page */
+               if (!isleaf)
+               {
                        /*
                         * We must also log the left page's high key, because the right
                         * page's leftmost key is suppressed on non-leaf levels.  Show it
@@ -1146,58 +1299,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                         * if XLogInsert decides it needs a full-page image of the left
                         * page.
                         */
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
-
                        itemid = PageGetItemId(origpage, P_HIKEY);
                        item = (IndexTuple) PageGetItem(origpage, itemid);
-                       lastrdata->data = (char *) item;
-                       lastrdata->len = MAXALIGN(IndexTupleSize(item));
-                       lastrdata->buffer = buf;        /* backup block 1 */
-                       lastrdata->buffer_std = true;
-               }
-
-               /*
-                * Log the new item and its offset, if it was inserted on the left
-                * page. (If it was put on the right page, we don't need to explicitly
-                * WAL log it because it's included with all the other items on the
-                * right page.) Show the new item as belonging to the left page
-                * buffer, so that it is not stored if XLogInsert decides it needs a
-                * full-page image of the left page.  We store the offset anyway,
-                * though, to support archive compression of these records.
-                */
-               if (newitemonleft)
-               {
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
-
-                       lastrdata->data = (char *) &newitemoff;
-                       lastrdata->len = sizeof(OffsetNumber);
-                       lastrdata->buffer = InvalidBuffer;
-
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
-
-                       lastrdata->data = (char *) newitem;
-                       lastrdata->len = MAXALIGN(newitemsz);
-                       lastrdata->buffer = buf;        /* backup block 1 */
-                       lastrdata->buffer_std = true;
-               }
-               else if (ropaque->btpo.level == 0)
-               {
-                       /*
-                        * Although we don't need to WAL-log the new item, we still need
-                        * XLogInsert to consider storing a full-page image of the left
-                        * page, so make an empty entry referencing that buffer. This also
-                        * ensures that the left page is always backup block 1.
-                        */
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
-
-                       lastrdata->data = NULL;
-                       lastrdata->len = 0;
-                       lastrdata->buffer = buf;        /* backup block 1 */
-                       lastrdata->buffer_std = true;
+                       XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
                }
 
                /*
@@ -1212,44 +1316,26 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                 * and so the item pointers can be reconstructed.  See comments for
                 * _bt_restore_page().
                 */
-               lastrdata->next = lastrdata + 1;
-               lastrdata++;
-
-               lastrdata->data = (char *) rightpage +
-                       ((PageHeader) rightpage)->pd_upper;
-               lastrdata->len = ((PageHeader) rightpage)->pd_special -
-                       ((PageHeader) rightpage)->pd_upper;
-               lastrdata->buffer = InvalidBuffer;
-
-               /* Log the right sibling, because we've changed its' prev-pointer. */
-               if (!P_RIGHTMOST(ropaque))
-               {
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
-
-                       lastrdata->data = NULL;
-                       lastrdata->len = 0;
-                       lastrdata->buffer = sbuf;       /* backup block 2 */
-                       lastrdata->buffer_std = true;
-               }
-
-               lastrdata->next = NULL;
+               XLogRegisterBufData(1,
+                                        (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
+                                                       ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
 
                if (isroot)
                        xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
                else
                        xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
 
-               recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+               recptr = XLogInsert(RM_BTREE_ID, xlinfo);
 
                PageSetLSN(origpage, recptr);
-               PageSetTLI(origpage, ThisTimeLineID);
                PageSetLSN(rightpage, recptr);
-               PageSetTLI(rightpage, ThisTimeLineID);
                if (!P_RIGHTMOST(ropaque))
                {
                        PageSetLSN(spage, recptr);
-                       PageSetTLI(spage, ThisTimeLineID);
+               }
+               if (!isleaf)
+               {
+                       PageSetLSN(BufferGetPage(cbuf), recptr);
                }
        }
 
@@ -1259,6 +1345,10 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
        if (!P_RIGHTMOST(ropaque))
                _bt_relbuf(rel, sbuf);
 
+       /* release the child */
+       if (!isleaf)
+               _bt_relbuf(rel, cbuf);
+
        /* split's done */
        return rbuf;
 }
@@ -1285,7 +1375,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
  *
  * We return the index of the first existing tuple that should go on the
  * righthand page, plus a boolean indicating whether the new tuple goes on
- * the left or right page.     The bool is necessary to disambiguate the case
+ * the left or right page.  The bool is necessary to disambiguate the case
  * where firstright == newitemoff.
  */
 static OffsetNumber
@@ -1521,7 +1611,7 @@ _bt_checksplitloc(FindSplitData *state,
  *
  * On entry, buf and rbuf are the left and right split pages, which we
  * still hold write locks on per the L&Y algorithm.  We release the
- * write locks once we have write lock on the parent page.     (Any sooner,
+ * write locks once we have write lock on the parent page.  (Any sooner,
  * and it'd be possible for some other process to try to split or delete
  * one of these pages, and get confused because it cannot find the downlink.)
  *
@@ -1529,10 +1619,8 @@ _bt_checksplitloc(FindSplitData *state,
  *                     have to be efficient (concurrent ROOT split, WAL recovery)
  * is_root - we split the true root
  * is_only - we split a page alone on its level (might have been fast root)
- *
- * This is exported so it can be called by nbtxlog.c.
  */
-void
+static void
 _bt_insert_parent(Relation rel,
                                  Buffer buf,
                                  Buffer rbuf,
@@ -1544,15 +1632,14 @@ _bt_insert_parent(Relation rel,
         * Here we have to do something Lehman and Yao don't talk about: deal with
         * a root split and construction of a new root.  If our stack is empty
         * then we have just split a node on what had been the root level when we
-        * descended the tree.  If it was still the root then we perform a
+        * descended the tree.  If it was still the root then we perform a
         * new-root construction.  If it *wasn't* the root anymore, search to find
         * the next higher level that someone constructed meanwhile, and find the
         * right place to insert as for the normal case.
         *
         * If we have to search for the parent level, we do so by re-descending
         * from the root.  This is not super-efficient, but it's rare enough not
-        * to matter.  (This path is also taken when called from WAL recovery ---
-        * we have no stack in that case.)
+        * to matter.
         */
        if (is_root)
        {
@@ -1581,8 +1668,7 @@ _bt_insert_parent(Relation rel,
                {
                        BTPageOpaque lpageop;
 
-                       if (!InRecovery)
-                               elog(DEBUG2, "concurrent ROOT page split");
+                       elog(DEBUG2, "concurrent ROOT page split");
                        lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
                        /* Find the leftmost page at the next level up */
                        pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
@@ -1611,12 +1697,13 @@ _bt_insert_parent(Relation rel,
                 * 05/27/97
                 */
                ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
-
                pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
 
-               /* Now we can unlock the children */
+               /*
+                * Now we can unlock the right child. The left child will be unlocked
+                * by _bt_insertonpg().
+                */
                _bt_relbuf(rel, rbuf);
-               _bt_relbuf(rel, buf);
 
                /* Check for error only after writing children */
                if (pbuf == InvalidBuffer)
@@ -1624,7 +1711,7 @@ _bt_insert_parent(Relation rel,
                                 RelationGetRelationName(rel), bknum, rbknum);
 
                /* Recursively update the parent */
-               _bt_insertonpg(rel, pbuf, stack->bts_parent,
+               _bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
                                           new_item, stack->bts_offset + 1,
                                           is_only);
 
@@ -1633,6 +1720,62 @@ _bt_insert_parent(Relation rel,
        }
 }
 
+/*
+ * _bt_finish_split() -- Finish an incomplete split
+ *
+ * A crash or other failure can leave a split incomplete.  The insertion
+ * routines won't allow to insert on a page that is incompletely split.
+ * Before inserting on such a page, call _bt_finish_split().
+ *
+ * On entry, 'lbuf' must be locked in write-mode.  On exit, it is unlocked
+ * and unpinned.
+ */
+void
+_bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
+{
+       Page            lpage = BufferGetPage(lbuf);
+       BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
+       Buffer          rbuf;
+       Page            rpage;
+       BTPageOpaque rpageop;
+       bool            was_root;
+       bool            was_only;
+
+       Assert(P_INCOMPLETE_SPLIT(lpageop));
+
+       /* Lock right sibling, the one missing the downlink */
+       rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
+       rpage = BufferGetPage(rbuf);
+       rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
+
+       /* Could this be a root split? */
+       if (!stack)
+       {
+               Buffer          metabuf;
+               Page            metapg;
+               BTMetaPageData *metad;
+
+               /* acquire lock on the metapage */
+               metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
+               metapg = BufferGetPage(metabuf);
+               metad = BTPageGetMeta(metapg);
+
+               was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
+
+               _bt_relbuf(rel, metabuf);
+       }
+       else
+               was_root = false;
+
+       /* Was this the only page on the level before split? */
+       was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
+
+       elog(DEBUG1, "finishing incomplete split of %u/%u",
+                BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
+
+       _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
+}
+
 /*
  *     _bt_getstackbuf() -- Walk back up the tree one step, and find the item
  *                                              we last looked at in the parent.
@@ -1665,6 +1808,12 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
                page = BufferGetPage(buf);
                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
+               if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
+               {
+                       _bt_finish_split(rel, buf, stack->bts_parent);
+                       continue;
+               }
+
                if (!P_IGNORE(opaque))
                {
                        OffsetNumber offnum,
@@ -1694,7 +1843,7 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
                        /*
                         * These loops will check every item on the page --- but in an
                         * order that's attuned to the probability of where it actually
-                        * is.  Scan to the right first, then to the left.
+                        * is.  Scan to the right first, then to the left.
                         */
                        for (offnum = start;
                                 offnum <= maxoff;
@@ -1769,10 +1918,13 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
                                rbkno;
        BlockNumber rootblknum;
        BTPageOpaque rootopaque;
+       BTPageOpaque lopaque;
        ItemId          itemid;
        IndexTuple      item;
-       Size            itemsz;
-       IndexTuple      new_item;
+       IndexTuple      left_item;
+       Size            left_item_sz;
+       IndexTuple      right_item;
+       Size            right_item_sz;
        Buffer          metabuf;
        Page            metapg;
        BTMetaPageData *metad;
@@ -1780,6 +1932,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
        lbkno = BufferGetBlockNumber(lbuf);
        rbkno = BufferGetBlockNumber(rbuf);
        lpage = BufferGetPage(lbuf);
+       lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
 
        /* get a new root page */
        rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
@@ -1791,6 +1944,26 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
        metapg = BufferGetPage(metabuf);
        metad = BTPageGetMeta(metapg);
 
+       /*
+        * Create downlink item for left page (old root).  Since this will be the
+        * first item in a non-leaf page, it implicitly has minus-infinity key
+        * value, so we need not store any actual key in it.
+        */
+       left_item_sz = sizeof(IndexTupleData);
+       left_item = (IndexTuple) palloc(left_item_sz);
+       left_item->t_info = left_item_sz;
+       ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
+
+       /*
+        * Create downlink item for right page.  The key for it is obtained from
+        * the "high key" position in the left page.
+        */
+       itemid = PageGetItemId(lpage, P_HIKEY);
+       right_item_sz = ItemIdGetLength(itemid);
+       item = (IndexTuple) PageGetItem(lpage, itemid);
+       right_item = CopyIndexTuple(item);
+       ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
+
        /* NO EREPORT(ERROR) from here till newroot op is logged */
        START_CRIT_SECTION();
 
@@ -1808,16 +1981,6 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
        metad->btm_fastroot = rootblknum;
        metad->btm_fastlevel = rootopaque->btpo.level;
 
-       /*
-        * Create downlink item for left page (old root).  Since this will be the
-        * first item in a non-leaf page, it implicitly has minus-infinity key
-        * value, so we need not store any actual key in it.
-        */
-       itemsz = sizeof(IndexTupleData);
-       new_item = (IndexTuple) palloc(itemsz);
-       new_item->t_info = itemsz;
-       ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
-
        /*
         * Insert the left page pointer into the new root page.  The root page is
         * the rightmost page on its level so there is no "high key" in it; the
@@ -1826,79 +1989,77 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
         * Note: we *must* insert the two items in item-number order, for the
         * benefit of _bt_restore_page().
         */
-       if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY,
+       if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
                                        false, false) == InvalidOffsetNumber)
                elog(PANIC, "failed to add leftkey to new root page"
                         " while splitting block %u of index \"%s\"",
                         BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
-       pfree(new_item);
-
-       /*
-        * Create downlink item for right page.  The key for it is obtained from
-        * the "high key" position in the left page.
-        */
-       itemid = PageGetItemId(lpage, P_HIKEY);
-       itemsz = ItemIdGetLength(itemid);
-       item = (IndexTuple) PageGetItem(lpage, itemid);
-       new_item = CopyIndexTuple(item);
-       ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
 
        /*
         * insert the right page pointer into the new root page.
         */
-       if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY,
+       if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
                                        false, false) == InvalidOffsetNumber)
                elog(PANIC, "failed to add rightkey to new root page"
                         " while splitting block %u of index \"%s\"",
                         BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
-       pfree(new_item);
+
+       /* Clear the incomplete-split flag in the left child */
+       Assert(P_INCOMPLETE_SPLIT(lopaque));
+       lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+       MarkBufferDirty(lbuf);
 
        MarkBufferDirty(rootbuf);
        MarkBufferDirty(metabuf);
 
        /* XLOG stuff */
-       if (!rel->rd_istemp)
+       if (RelationNeedsWAL(rel))
        {
                xl_btree_newroot xlrec;
                XLogRecPtr      recptr;
-               XLogRecData rdata[2];
+               xl_btree_metadata md;
 
-               xlrec.node = rel->rd_node;
                xlrec.rootblk = rootblknum;
                xlrec.level = metad->btm_level;
 
-               rdata[0].data = (char *) &xlrec;
-               rdata[0].len = SizeOfBtreeNewroot;
-               rdata[0].buffer = InvalidBuffer;
-               rdata[0].next = &(rdata[1]);
+               XLogBeginInsert();
+               XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
+
+               XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
+               XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+
+               md.root = rootblknum;
+               md.level = metad->btm_level;
+               md.fastroot = rootblknum;
+               md.fastlevel = metad->btm_level;
+
+               XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
 
                /*
                 * Direct access to page is not good but faster - we should implement
                 * some new func in page API.
                 */
-               rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
-               rdata[1].len = ((PageHeader) rootpage)->pd_special -
-                       ((PageHeader) rootpage)->pd_upper;
-               rdata[1].buffer = InvalidBuffer;
-               rdata[1].next = NULL;
+               XLogRegisterBufData(0,
+                                          (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
+                                                       ((PageHeader) rootpage)->pd_special -
+                                                       ((PageHeader) rootpage)->pd_upper);
 
-               recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
+               recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
 
+               PageSetLSN(lpage, recptr);
                PageSetLSN(rootpage, recptr);
-               PageSetTLI(rootpage, ThisTimeLineID);
                PageSetLSN(metapg, recptr);
-               PageSetTLI(metapg, ThisTimeLineID);
        }
 
        END_CRIT_SECTION();
 
-       /* send out relcache inval for metapage change */
-       if (!InRecovery)
-               CacheInvalidateRelcache(rel);
-
        /* done with metapage */
        _bt_relbuf(rel, metabuf);
 
+       pfree(left_item);
+       pfree(right_item);
+
        return rootbuf;
 }
 
@@ -1917,13 +2078,11 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
  *             we insert the tuples in order, so that the given itup_off does
  *             represent the final position of the tuple!
  */
-static void
-_bt_pgaddtup(Relation rel,
-                        Page page,
+static bool
+_bt_pgaddtup(Page page,
                         Size itemsize,
                         IndexTuple itup,
-                        OffsetNumber itup_off,
-                        const char *where)
+                        OffsetNumber itup_off)
 {
        BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
        IndexTupleData trunctuple;
@@ -1938,8 +2097,9 @@ _bt_pgaddtup(Relation rel,
 
        if (PageAddItem(page, (Item) itup, itemsize, itup_off,
                                        false, false) == InvalidOffsetNumber)
-               elog(PANIC, "failed to add item to the %s in index \"%s\"",
-                        where, RelationGetRelationName(rel));
+               return false;
+
+       return true;
 }
 
 /*
@@ -1975,9 +2135,10 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
                if (isNull || (scankey->sk_flags & SK_ISNULL))
                        return false;
 
-               result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
-                                                                                        datum,
-                                                                                        scankey->sk_argument));
+               result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
+                                                                                                scankey->sk_collation,
+                                                                                                datum,
+                                                                                                scankey->sk_argument));
 
                if (result != 0)
                        return false;
@@ -1997,7 +2158,7 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
  * super-exclusive "cleanup" lock (see nbtree/README).
  */
 static void
-_bt_vacuum_one_page(Relation rel, Buffer buffer)
+_bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
 {
        OffsetNumber deletable[MaxOffsetNumber];
        int                     ndeletable = 0;
@@ -2024,7 +2185,7 @@ _bt_vacuum_one_page(Relation rel, Buffer buffer)
        }
 
        if (ndeletable > 0)
-               _bt_delitems(rel, buffer, deletable, ndeletable);
+               _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
 
        /*
         * Note: if we didn't find any LP_DEAD items, then the page's