]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/nbtree/nbtinsert.c
Reduce pinning and buffer content locking for btree scans.
[postgresql] / src / backend / access / nbtree / nbtinsert.c
index d758659c314b6894149ac7a60d9742bf5f9c0ecf..ef68a7145fce5f5352196a4b1c751aee26e10859 100644 (file)
@@ -3,7 +3,7 @@
  * nbtinsert.c
  *       Item insertion in Lehman and Yao btrees for Postgres.
  *
- * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
 #include "access/heapam.h"
 #include "access/nbtree.h"
 #include "access/transam.h"
+#include "access/xloginsert.h"
 #include "miscadmin.h"
-#include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
-#include "utils/inval.h"
 #include "utils/tqual.h"
 
 
@@ -59,15 +58,18 @@ static void _bt_findinsertloc(Relation rel,
                                  int keysz,
                                  ScanKey scankey,
                                  IndexTuple newtup,
+                                 BTStack stack,
                                  Relation heapRel);
-static void _bt_insertonpg(Relation rel, Buffer buf,
+static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
                           BTStack stack,
                           IndexTuple itup,
                           OffsetNumber newitemoff,
                           bool split_only_page);
-static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
-                 OffsetNumber newitemoff, Size newitemsz,
+static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
+                 OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
                  IndexTuple newitem, bool newitemonleft);
+static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
+                                 BTStack stack, bool is_root, bool is_only);
 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
                                 OffsetNumber newitemoff,
                                 Size newitemsz,
@@ -85,11 +87,11 @@ static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
 /*
  *     _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
  *
- *             This routine is called by the public interface routines, btbuild
- *             and btinsert.  By here, itup is filled in, including the TID.
+ *             This routine is called by the public interface routine, btinsert.
+ *             By here, itup is filled in, including the TID.
  *
  *             If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
- *             will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
+ *             will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
  *             UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
  *             For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
  *             don't actually insert.
@@ -128,10 +130,11 @@ top:
         * If the page was split between the time that we surrendered our read
         * lock and acquired our write lock, then this page may no longer be the
         * right place for the key we want to insert.  In this case, we need to
-        * move right in the tree.      See Lehman and Yao for an excruciatingly
+        * move right in the tree.  See Lehman and Yao for an excruciatingly
         * precise description.
         */
-       buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
+       buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+                                               true, stack, BT_WRITE);
 
        /*
         * If we're not allowing duplicates, make sure the key isn't already in
@@ -166,7 +169,7 @@ top:
                {
                        /* Have to wait for the other guy ... */
                        _bt_relbuf(rel, buf);
-                       XactLockTableWait(xwait);
+                       XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
                        /* start over... */
                        _bt_freestack(stack);
                        goto top;
@@ -184,8 +187,9 @@ top:
                 */
                CheckForSerializableConflictIn(rel, NULL, buf);
                /* do the insertion */
-               _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
-               _bt_insertonpg(rel, buf, stack, itup, offset, false);
+               _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+                                                 stack, heapRel);
+               _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
        }
        else
        {
@@ -208,7 +212,7 @@ top:
  * is the first tuple on the next page.
  *
  * Returns InvalidTransactionId if there is no conflict, else an xact ID
- * we must wait for to see if it commits a conflicting tuple.  If an actual
+ * we must wait for to see if it commits a conflicting tuple.   If an actual
  * conflict is detected, no return --- just ereport().
  *
  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
@@ -290,7 +294,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 
                                /*
                                 * If we are doing a recheck, we expect to find the tuple we
-                                * are rechecking.      It's not a duplicate, but we have to keep
+                                * are rechecking.  It's not a duplicate, but we have to keep
                                 * scanning.
                                 */
                                if (checkUnique == UNIQUE_CHECK_EXISTING &&
@@ -385,16 +389,22 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                                        {
                                                Datum           values[INDEX_MAX_KEYS];
                                                bool            isnull[INDEX_MAX_KEYS];
+                                               char       *key_desc;
 
                                                index_deform_tuple(itup, RelationGetDescr(rel),
                                                                                   values, isnull);
+
+                                               key_desc = BuildIndexValueDescription(rel, values,
+                                                                                                                         isnull);
+
                                                ereport(ERROR,
                                                                (errcode(ERRCODE_UNIQUE_VIOLATION),
                                                                 errmsg("duplicate key value violates unique constraint \"%s\"",
                                                                                RelationGetRelationName(rel)),
-                                                                errdetail("Key %s already exists.",
-                                                                                  BuildIndexValueDescription(rel,
-                                                                                                                 values, isnull))));
+                                                                key_desc ? errdetail("Key %s already exists.",
+                                                                                                         key_desc) : 0,
+                                                                errtableconstraint(heapRel,
+                                                                                        RelationGetRelationName(rel))));
                                        }
                                }
                                else if (all_dead)
@@ -406,11 +416,15 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                                         */
                                        ItemIdMarkDead(curitemid);
                                        opaque->btpo_flags |= BTP_HAS_GARBAGE;
-                                       /* be sure to mark the proper buffer dirty... */
+
+                                       /*
+                                        * Mark buffer with a dirty hint, since state is not
+                                        * crucial. Be sure to mark the proper buffer dirty.
+                                        */
                                        if (nbuf != InvalidBuffer)
-                                               SetBufferCommitInfoNeedsSave(nbuf);
+                                               MarkBufferDirtyHint(nbuf, true);
                                        else
-                                               SetBufferCommitInfoNeedsSave(buf);
+                                               MarkBufferDirtyHint(buf, true);
                                }
                        }
                }
@@ -456,7 +470,9 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                                (errcode(ERRCODE_INTERNAL_ERROR),
                                 errmsg("failed to re-find tuple within index \"%s\"",
                                                RelationGetRelationName(rel)),
-               errhint("This may be because of a non-immutable index expression.")));
+                errhint("This may be because of a non-immutable index expression."),
+                                errtableconstraint(heapRel,
+                                                                       RelationGetRelationName(rel))));
 
        if (nbuf != InvalidBuffer)
                _bt_relbuf(rel, nbuf);
@@ -471,7 +487,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
  *             If the new key is equal to one or more existing keys, we can
  *             legitimately place it anywhere in the series of equal keys --- in fact,
  *             if the new key is equal to the page's "high key" we can place it on
- *             the next page.  If it is equal to the high key, and there's not room
+ *             the next page.  If it is equal to the high key, and there's not room
  *             to insert the new tuple on the current page without splitting, then
  *             we can move right hoping to find more free space and avoid a split.
  *             (We should not move right indefinitely, however, since that leads to
@@ -482,9 +498,9 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
  *             If there's not enough room in the space, we try to make room by
  *             removing any LP_DEAD tuples.
  *
- *             On entry, *buf and *offsetptr point to the first legal position
- *             where the new tuple could be inserted.  The caller should hold an
- *             exclusive lock on *buf.  *offsetptr can also be set to
+ *             On entry, *bufptr and *offsetptr point to the first legal position
+ *             where the new tuple could be inserted.  The caller should hold an
+ *             exclusive lock on *bufptr.  *offsetptr can also be set to
  *             InvalidOffsetNumber, in which case the function will search for the
  *             right location within the page if needed.  On exit, they point to the
  *             chosen insert location.  If _bt_findinsertloc decides to move right,
@@ -501,6 +517,7 @@ _bt_findinsertloc(Relation rel,
                                  int keysz,
                                  ScanKey scankey,
                                  IndexTuple newtup,
+                                 BTStack stack,
                                  Relation heapRel)
 {
        Buffer          buf = *bufptr;
@@ -524,17 +541,20 @@ _bt_findinsertloc(Relation rel,
         * able to fit three items on every page, so restrict any one item to 1/3
         * the per-page available space. Note that at this point, itemsz doesn't
         * include the ItemId.
+        *
+        * NOTE: if you change this, see also the similar code in _bt_buildadd().
         */
        if (itemsz > BTMaxItemSize(page))
                ereport(ERROR,
                                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-                       errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
-                                  (unsigned long) itemsz,
-                                  (unsigned long) BTMaxItemSize(page),
+                       errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
+                                  itemsz, BTMaxItemSize(page),
                                   RelationGetRelationName(rel)),
                errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
                                "Consider a function index of an MD5 hash of the value, "
-                               "or use full text indexing.")));
+                               "or use full text indexing."),
+                                errtableconstraint(heapRel,
+                                                                       RelationGetRelationName(rel))));
 
        /*----------
         * If we will need to split the page to put the item on this page,
@@ -549,7 +569,7 @@ _bt_findinsertloc(Relation rel,
         * on every insert.  We implement "get tired" as a random choice,
         * since stopping after scanning a fixed number of pages wouldn't work
         * well (we'd never reach the right-hand side of previously split
-        * pages).      Currently the probability of moving right is set at 0.99,
+        * pages).  Currently the probability of moving right is set at 0.99,
         * which may seem too high to change the behavior much, but it does an
         * excellent job of preventing O(N^2) behavior with many equal keys.
         *----------
@@ -559,6 +579,7 @@ _bt_findinsertloc(Relation rel,
        while (PageGetFreeSpace(page) < itemsz)
        {
                Buffer          rbuf;
+               BlockNumber rblkno;
 
                /*
                 * before considering moving right, see if we can obtain enough space
@@ -596,18 +617,33 @@ _bt_findinsertloc(Relation rel,
                 */
                rbuf = InvalidBuffer;
 
+               rblkno = lpageop->btpo_next;
                for (;;)
                {
-                       BlockNumber rblkno = lpageop->btpo_next;
-
                        rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
                        page = BufferGetPage(rbuf);
                        lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+                       /*
+                        * If this page was incompletely split, finish the split now. We
+                        * do this while holding a lock on the left sibling, which is not
+                        * good because finishing the split could be a fairly lengthy
+                        * operation.  But this should happen very seldom.
+                        */
+                       if (P_INCOMPLETE_SPLIT(lpageop))
+                       {
+                               _bt_finish_split(rel, rbuf, stack);
+                               rbuf = InvalidBuffer;
+                               continue;
+                       }
+
                        if (!P_IGNORE(lpageop))
                                break;
                        if (P_RIGHTMOST(lpageop))
                                elog(ERROR, "fell off the end of index \"%s\"",
                                         RelationGetRelationName(rel));
+
+                       rblkno = lpageop->btpo_next;
                }
                _bt_relbuf(rel, buf);
                buf = rbuf;
@@ -649,10 +685,14 @@ _bt_findinsertloc(Relation rel,
  *                        child page on the parent.
  *                     +  updates the metapage if a true root or fast root is split.
  *
- *             On entry, we must have the right buffer in which to do the
- *             insertion, and the buffer must be pinned and write-locked.      On return,
+ *             On entry, we must have the correct buffer in which to do the
+ *             insertion, and the buffer must be pinned and write-locked.  On return,
  *             we will have dropped both the pin and the lock on the buffer.
  *
+ *             When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
+ *             page we're inserting the downlink for.  This function will clear the
+ *             INCOMPLETE_SPLIT flag on it, and release the buffer.
+ *
  *             The locking interactions in this code are critical.  You should
  *             grok Lehman and Yao's paper before making any changes.  In addition,
  *             you need to understand how we disambiguate duplicate keys in this
@@ -666,6 +706,7 @@ _bt_findinsertloc(Relation rel,
 static void
 _bt_insertonpg(Relation rel,
                           Buffer buf,
+                          Buffer cbuf,
                           BTStack stack,
                           IndexTuple itup,
                           OffsetNumber newitemoff,
@@ -679,6 +720,14 @@ _bt_insertonpg(Relation rel,
        page = BufferGetPage(buf);
        lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
 
+       /* child buffer must be given iff inserting on an internal page */
+       Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
+
+       /* The caller should've finished any incomplete splits already. */
+       if (P_INCOMPLETE_SPLIT(lpageop))
+               elog(ERROR, "cannot insert to incompletely split page %u",
+                        BufferGetBlockNumber(buf));
+
        itemsz = IndexTupleDSize(*itup);
        itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but we
                                                                 * need to be consistent */
@@ -703,7 +752,7 @@ _bt_insertonpg(Relation rel,
                                                                          &newitemonleft);
 
                /* split the buffer into left and right halves */
-               rbuf = _bt_split(rel, buf, firstright,
+               rbuf = _bt_split(rel, buf, cbuf, firstright,
                                                 newitemoff, itemsz, itup, newitemonleft);
                PredicateLockPageSplit(rel,
                                                           BufferGetBlockNumber(buf),
@@ -777,38 +826,40 @@ _bt_insertonpg(Relation rel,
                        MarkBufferDirty(metabuf);
                }
 
+               /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
+               if (BufferIsValid(cbuf))
+               {
+                       Page            cpage = BufferGetPage(cbuf);
+                       BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+
+                       Assert(P_INCOMPLETE_SPLIT(cpageop));
+                       cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+                       MarkBufferDirty(cbuf);
+               }
+
                /* XLOG stuff */
                if (RelationNeedsWAL(rel))
                {
                        xl_btree_insert xlrec;
-                       BlockNumber xldownlink;
                        xl_btree_metadata xlmeta;
                        uint8           xlinfo;
                        XLogRecPtr      recptr;
-                       XLogRecData rdata[4];
-                       XLogRecData *nextrdata;
                        IndexTupleData trunctuple;
 
-                       xlrec.target.node = rel->rd_node;
-                       ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
+                       xlrec.offnum = itup_off;
 
-                       rdata[0].data = (char *) &xlrec;
-                       rdata[0].len = SizeOfBtreeInsert;
-                       rdata[0].buffer = InvalidBuffer;
-                       rdata[0].next = nextrdata = &(rdata[1]);
+                       XLogBeginInsert();
+                       XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
 
                        if (P_ISLEAF(lpageop))
                                xlinfo = XLOG_BTREE_INSERT_LEAF;
                        else
                        {
-                               xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
-                               Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
-
-                               nextrdata->data = (char *) &xldownlink;
-                               nextrdata->len = sizeof(BlockNumber);
-                               nextrdata->buffer = InvalidBuffer;
-                               nextrdata->next = nextrdata + 1;
-                               nextrdata++;
+                               /*
+                                * Register the left child whose INCOMPLETE_SPLIT flag was
+                                * cleared.
+                                */
+                               XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
 
                                xlinfo = XLOG_BTREE_INSERT_UPPER;
                        }
@@ -820,54 +871,45 @@ _bt_insertonpg(Relation rel,
                                xlmeta.fastroot = metad->btm_fastroot;
                                xlmeta.fastlevel = metad->btm_fastlevel;
 
-                               nextrdata->data = (char *) &xlmeta;
-                               nextrdata->len = sizeof(xl_btree_metadata);
-                               nextrdata->buffer = InvalidBuffer;
-                               nextrdata->next = nextrdata + 1;
-                               nextrdata++;
+                               XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+                               XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
 
                                xlinfo = XLOG_BTREE_INSERT_META;
                        }
 
                        /* Read comments in _bt_pgaddtup */
+                       XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
                        if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
                        {
                                trunctuple = *itup;
                                trunctuple.t_info = sizeof(IndexTupleData);
-                               nextrdata->data = (char *) &trunctuple;
-                               nextrdata->len = sizeof(IndexTupleData);
+                               XLogRegisterBufData(0, (char *) &trunctuple,
+                                                                       sizeof(IndexTupleData));
                        }
                        else
-                       {
-                               nextrdata->data = (char *) itup;
-                               nextrdata->len = IndexTupleDSize(*itup);
-                       }
-                       nextrdata->buffer = buf;
-                       nextrdata->buffer_std = true;
-                       nextrdata->next = NULL;
+                               XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
 
-                       recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+                       recptr = XLogInsert(RM_BTREE_ID, xlinfo);
 
                        if (BufferIsValid(metabuf))
                        {
                                PageSetLSN(metapg, recptr);
-                               PageSetTLI(metapg, ThisTimeLineID);
+                       }
+                       if (BufferIsValid(cbuf))
+                       {
+                               PageSetLSN(BufferGetPage(cbuf), recptr);
                        }
 
                        PageSetLSN(page, recptr);
-                       PageSetTLI(page, ThisTimeLineID);
                }
 
                END_CRIT_SECTION();
 
-               /* release buffers; send out relcache inval if metapage changed */
+               /* release buffers */
                if (BufferIsValid(metabuf))
-               {
-                       if (!InRecovery)
-                               CacheInvalidateRelcache(rel);
                        _bt_relbuf(rel, metabuf);
-               }
-
+               if (BufferIsValid(cbuf))
+                       _bt_relbuf(rel, cbuf);
                _bt_relbuf(rel, buf);
        }
 }
@@ -880,11 +922,15 @@ _bt_insertonpg(Relation rel,
  *             new right page.  newitemoff etc. tell us about the new item that
  *             must be inserted along with the data from the old page.
  *
+ *             When splitting a non-leaf page, 'cbuf' is the left-sibling of the
+ *             page we're inserting the downlink for.  This function will clear the
+ *             INCOMPLETE_SPLIT flag on it, and release the buffer.
+ *
  *             Returns the new right sibling of buf, pinned and write-locked.
  *             The pin and lock on buf are maintained.
  */
 static Buffer
-_bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
+_bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
                  OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
                  bool newitemonleft)
 {
@@ -908,6 +954,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
        OffsetNumber maxoff;
        OffsetNumber i;
        bool            isroot;
+       bool            isleaf;
 
        /* Acquire a new page to split into */
        rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
@@ -916,7 +963,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
         * origpage is the original page to be split.  leftpage is a temporary
         * buffer that receives the left-sibling data, which will be copied back
         * into origpage on success.  rightpage is the new page that receives the
-        * right-sibling data.  If we fail before reaching the critical section,
+        * right-sibling data.  If we fail before reaching the critical section,
         * origpage hasn't been modified and leftpage is only workspace. In
         * principle we shouldn't need to worry about rightpage either, because it
         * hasn't been linked into the btree page structure; but to avoid leaving
@@ -934,12 +981,11 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
        /* rightpage was already initialized by _bt_getbuf */
 
        /*
-        * Copy the original page's LSN and TLI into leftpage, which will become
-        * the updated version of the page.  We need this because XLogInsert will
-        * examine these fields and possibly dump them in a page image.
+        * Copy the original page's LSN into leftpage, which will become the
+        * updated version of the page.  We need this because XLogInsert will
+        * examine the LSN and possibly dump it in a page image.
         */
        PageSetLSN(leftpage, PageGetLSN(origpage));
-       PageSetTLI(leftpage, PageGetTLI(origpage));
 
        /* init btree private data */
        oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
@@ -947,12 +993,15 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
        ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
 
        isroot = P_ISROOT(oopaque);
+       isleaf = P_ISLEAF(oopaque);
 
        /* if we're splitting this page, it won't be the root when we're done */
        /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
        lopaque->btpo_flags = oopaque->btpo_flags;
        lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
        ropaque->btpo_flags = lopaque->btpo_flags;
+       /* set flag in left page indicating that the right page has no downlink */
+       lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
        lopaque->btpo_prev = oopaque->btpo_prev;
        lopaque->btpo_next = rightpagenumber;
        ropaque->btpo_prev = origpagenumber;
@@ -1132,7 +1181,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                 * page.  If you're confused, imagine that page A splits to A B and
                 * then again, yielding A C B, while vacuum is in progress.  Tuples
                 * originally in A could now be in either B or C, hence vacuum must
-                * examine both pages.  But if D, our right sibling, has a different
+                * examine both pages.  But if D, our right sibling, has a different
                 * cycleid then it could not contain any tuples that were in A when
                 * the vacuum started.
                 */
@@ -1176,38 +1225,56 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                MarkBufferDirty(sbuf);
        }
 
+       /*
+        * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
+        * a split.
+        */
+       if (!isleaf)
+       {
+               Page            cpage = BufferGetPage(cbuf);
+               BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+
+               cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+               MarkBufferDirty(cbuf);
+       }
+
        /* XLOG stuff */
        if (RelationNeedsWAL(rel))
        {
                xl_btree_split xlrec;
                uint8           xlinfo;
                XLogRecPtr      recptr;
-               XLogRecData rdata[7];
-               XLogRecData *lastrdata;
 
-               xlrec.node = rel->rd_node;
-               xlrec.leftsib = origpagenumber;
-               xlrec.rightsib = rightpagenumber;
-               xlrec.rnext = ropaque->btpo_next;
                xlrec.level = ropaque->btpo.level;
                xlrec.firstright = firstright;
+               xlrec.newitemoff = newitemoff;
 
-               rdata[0].data = (char *) &xlrec;
-               rdata[0].len = SizeOfBtreeSplit;
-               rdata[0].buffer = InvalidBuffer;
-
-               lastrdata = &rdata[0];
+               XLogBeginInsert();
+               XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
 
-               if (ropaque->btpo.level > 0)
-               {
-                       /* Log downlink on non-leaf pages */
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
+               XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+               XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
+               /* Log the right sibling, because we've changed its prev-pointer. */
+               if (!P_RIGHTMOST(ropaque))
+                       XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
+               if (BufferIsValid(cbuf))
+                       XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
 
-                       lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
-                       lastrdata->len = sizeof(BlockIdData);
-                       lastrdata->buffer = InvalidBuffer;
+               /*
+                * Log the new item, if it was inserted on the left page. (If it was
+                * put on the right page, we don't need to explicitly WAL log it
+                * because it's included with all the other items on the right page.)
+                * Show the new item as belonging to the left page buffer, so that it
+                * is not stored if XLogInsert decides it needs a full-page image of
+                * the left page.  We store the offset anyway, though, to support
+                * archive compression of these records.
+                */
+               if (newitemonleft)
+                       XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
 
+               /* Log left page */
+               if (!isleaf)
+               {
                        /*
                         * We must also log the left page's high key, because the right
                         * page's leftmost key is suppressed on non-leaf levels.  Show it
@@ -1215,58 +1282,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                         * if XLogInsert decides it needs a full-page image of the left
                         * page.
                         */
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
-
                        itemid = PageGetItemId(origpage, P_HIKEY);
                        item = (IndexTuple) PageGetItem(origpage, itemid);
-                       lastrdata->data = (char *) item;
-                       lastrdata->len = MAXALIGN(IndexTupleSize(item));
-                       lastrdata->buffer = buf;        /* backup block 1 */
-                       lastrdata->buffer_std = true;
-               }
-
-               /*
-                * Log the new item and its offset, if it was inserted on the left
-                * page. (If it was put on the right page, we don't need to explicitly
-                * WAL log it because it's included with all the other items on the
-                * right page.) Show the new item as belonging to the left page
-                * buffer, so that it is not stored if XLogInsert decides it needs a
-                * full-page image of the left page.  We store the offset anyway,
-                * though, to support archive compression of these records.
-                */
-               if (newitemonleft)
-               {
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
-
-                       lastrdata->data = (char *) &newitemoff;
-                       lastrdata->len = sizeof(OffsetNumber);
-                       lastrdata->buffer = InvalidBuffer;
-
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
-
-                       lastrdata->data = (char *) newitem;
-                       lastrdata->len = MAXALIGN(newitemsz);
-                       lastrdata->buffer = buf;        /* backup block 1 */
-                       lastrdata->buffer_std = true;
-               }
-               else if (ropaque->btpo.level == 0)
-               {
-                       /*
-                        * Although we don't need to WAL-log the new item, we still need
-                        * XLogInsert to consider storing a full-page image of the left
-                        * page, so make an empty entry referencing that buffer. This also
-                        * ensures that the left page is always backup block 1.
-                        */
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
-
-                       lastrdata->data = NULL;
-                       lastrdata->len = 0;
-                       lastrdata->buffer = buf;        /* backup block 1 */
-                       lastrdata->buffer_std = true;
+                       XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
                }
 
                /*
@@ -1281,44 +1299,26 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
                 * and so the item pointers can be reconstructed.  See comments for
                 * _bt_restore_page().
                 */
-               lastrdata->next = lastrdata + 1;
-               lastrdata++;
-
-               lastrdata->data = (char *) rightpage +
-                       ((PageHeader) rightpage)->pd_upper;
-               lastrdata->len = ((PageHeader) rightpage)->pd_special -
-                       ((PageHeader) rightpage)->pd_upper;
-               lastrdata->buffer = InvalidBuffer;
-
-               /* Log the right sibling, because we've changed its' prev-pointer. */
-               if (!P_RIGHTMOST(ropaque))
-               {
-                       lastrdata->next = lastrdata + 1;
-                       lastrdata++;
-
-                       lastrdata->data = NULL;
-                       lastrdata->len = 0;
-                       lastrdata->buffer = sbuf;       /* backup block 2 */
-                       lastrdata->buffer_std = true;
-               }
-
-               lastrdata->next = NULL;
+               XLogRegisterBufData(1,
+                                        (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
+                                                       ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
 
                if (isroot)
                        xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
                else
                        xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
 
-               recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+               recptr = XLogInsert(RM_BTREE_ID, xlinfo);
 
                PageSetLSN(origpage, recptr);
-               PageSetTLI(origpage, ThisTimeLineID);
                PageSetLSN(rightpage, recptr);
-               PageSetTLI(rightpage, ThisTimeLineID);
                if (!P_RIGHTMOST(ropaque))
                {
                        PageSetLSN(spage, recptr);
-                       PageSetTLI(spage, ThisTimeLineID);
+               }
+               if (!isleaf)
+               {
+                       PageSetLSN(BufferGetPage(cbuf), recptr);
                }
        }
 
@@ -1328,6 +1328,10 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
        if (!P_RIGHTMOST(ropaque))
                _bt_relbuf(rel, sbuf);
 
+       /* release the child */
+       if (!isleaf)
+               _bt_relbuf(rel, cbuf);
+
        /* split's done */
        return rbuf;
 }
@@ -1354,7 +1358,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
  *
  * We return the index of the first existing tuple that should go on the
  * righthand page, plus a boolean indicating whether the new tuple goes on
- * the left or right page.     The bool is necessary to disambiguate the case
+ * the left or right page.  The bool is necessary to disambiguate the case
  * where firstright == newitemoff.
  */
 static OffsetNumber
@@ -1590,7 +1594,7 @@ _bt_checksplitloc(FindSplitData *state,
  *
  * On entry, buf and rbuf are the left and right split pages, which we
  * still hold write locks on per the L&Y algorithm.  We release the
- * write locks once we have write lock on the parent page.     (Any sooner,
+ * write locks once we have write lock on the parent page.  (Any sooner,
  * and it'd be possible for some other process to try to split or delete
  * one of these pages, and get confused because it cannot find the downlink.)
  *
@@ -1598,10 +1602,8 @@ _bt_checksplitloc(FindSplitData *state,
  *                     have to be efficient (concurrent ROOT split, WAL recovery)
  * is_root - we split the true root
  * is_only - we split a page alone on its level (might have been fast root)
- *
- * This is exported so it can be called by nbtxlog.c.
  */
-void
+static void
 _bt_insert_parent(Relation rel,
                                  Buffer buf,
                                  Buffer rbuf,
@@ -1613,15 +1615,14 @@ _bt_insert_parent(Relation rel,
         * Here we have to do something Lehman and Yao don't talk about: deal with
         * a root split and construction of a new root.  If our stack is empty
         * then we have just split a node on what had been the root level when we
-        * descended the tree.  If it was still the root then we perform a
+        * descended the tree.  If it was still the root then we perform a
         * new-root construction.  If it *wasn't* the root anymore, search to find
         * the next higher level that someone constructed meanwhile, and find the
         * right place to insert as for the normal case.
         *
         * If we have to search for the parent level, we do so by re-descending
         * from the root.  This is not super-efficient, but it's rare enough not
-        * to matter.  (This path is also taken when called from WAL recovery ---
-        * we have no stack in that case.)
+        * to matter.
         */
        if (is_root)
        {
@@ -1650,8 +1651,7 @@ _bt_insert_parent(Relation rel,
                {
                        BTPageOpaque lpageop;
 
-                       if (!InRecovery)
-                               elog(DEBUG2, "concurrent ROOT page split");
+                       elog(DEBUG2, "concurrent ROOT page split");
                        lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
                        /* Find the leftmost page at the next level up */
                        pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
@@ -1680,12 +1680,13 @@ _bt_insert_parent(Relation rel,
                 * 05/27/97
                 */
                ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
-
                pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
 
-               /* Now we can unlock the children */
+               /*
+                * Now we can unlock the right child. The left child will be unlocked
+                * by _bt_insertonpg().
+                */
                _bt_relbuf(rel, rbuf);
-               _bt_relbuf(rel, buf);
 
                /* Check for error only after writing children */
                if (pbuf == InvalidBuffer)
@@ -1693,7 +1694,7 @@ _bt_insert_parent(Relation rel,
                                 RelationGetRelationName(rel), bknum, rbknum);
 
                /* Recursively update the parent */
-               _bt_insertonpg(rel, pbuf, stack->bts_parent,
+               _bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
                                           new_item, stack->bts_offset + 1,
                                           is_only);
 
@@ -1702,6 +1703,62 @@ _bt_insert_parent(Relation rel,
        }
 }
 
+/*
+ * _bt_finish_split() -- Finish an incomplete split
+ *
+ * A crash or other failure can leave a split incomplete.  The insertion
+ * routines won't allow to insert on a page that is incompletely split.
+ * Before inserting on such a page, call _bt_finish_split().
+ *
+ * On entry, 'lbuf' must be locked in write-mode.  On exit, it is unlocked
+ * and unpinned.
+ */
+void
+_bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
+{
+       Page            lpage = BufferGetPage(lbuf);
+       BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
+       Buffer          rbuf;
+       Page            rpage;
+       BTPageOpaque rpageop;
+       bool            was_root;
+       bool            was_only;
+
+       Assert(P_INCOMPLETE_SPLIT(lpageop));
+
+       /* Lock right sibling, the one missing the downlink */
+       rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
+       rpage = BufferGetPage(rbuf);
+       rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
+
+       /* Could this be a root split? */
+       if (!stack)
+       {
+               Buffer          metabuf;
+               Page            metapg;
+               BTMetaPageData *metad;
+
+               /* acquire lock on the metapage */
+               metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
+               metapg = BufferGetPage(metabuf);
+               metad = BTPageGetMeta(metapg);
+
+               was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
+
+               _bt_relbuf(rel, metabuf);
+       }
+       else
+               was_root = false;
+
+       /* Was this the only page on the level before split? */
+       was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
+
+       elog(DEBUG1, "finishing incomplete split of %u/%u",
+                BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
+
+       _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
+}
+
 /*
  *     _bt_getstackbuf() -- Walk back up the tree one step, and find the item
  *                                              we last looked at in the parent.
@@ -1734,6 +1791,12 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
                page = BufferGetPage(buf);
                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
+               if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
+               {
+                       _bt_finish_split(rel, buf, stack->bts_parent);
+                       continue;
+               }
+
                if (!P_IGNORE(opaque))
                {
                        OffsetNumber offnum,
@@ -1763,7 +1826,7 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
                        /*
                         * These loops will check every item on the page --- but in an
                         * order that's attuned to the probability of where it actually
-                        * is.  Scan to the right first, then to the left.
+                        * is.  Scan to the right first, then to the left.
                         */
                        for (offnum = start;
                                 offnum <= maxoff;
@@ -1838,10 +1901,13 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
                                rbkno;
        BlockNumber rootblknum;
        BTPageOpaque rootopaque;
+       BTPageOpaque lopaque;
        ItemId          itemid;
        IndexTuple      item;
-       Size            itemsz;
-       IndexTuple      new_item;
+       IndexTuple      left_item;
+       Size            left_item_sz;
+       IndexTuple      right_item;
+       Size            right_item_sz;
        Buffer          metabuf;
        Page            metapg;
        BTMetaPageData *metad;
@@ -1849,6 +1915,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
        lbkno = BufferGetBlockNumber(lbuf);
        rbkno = BufferGetBlockNumber(rbuf);
        lpage = BufferGetPage(lbuf);
+       lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
 
        /* get a new root page */
        rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
@@ -1860,6 +1927,26 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
        metapg = BufferGetPage(metabuf);
        metad = BTPageGetMeta(metapg);
 
+       /*
+        * Create downlink item for left page (old root).  Since this will be the
+        * first item in a non-leaf page, it implicitly has minus-infinity key
+        * value, so we need not store any actual key in it.
+        */
+       left_item_sz = sizeof(IndexTupleData);
+       left_item = (IndexTuple) palloc(left_item_sz);
+       left_item->t_info = left_item_sz;
+       ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
+
+       /*
+        * Create downlink item for right page.  The key for it is obtained from
+        * the "high key" position in the left page.
+        */
+       itemid = PageGetItemId(lpage, P_HIKEY);
+       right_item_sz = ItemIdGetLength(itemid);
+       item = (IndexTuple) PageGetItem(lpage, itemid);
+       right_item = CopyIndexTuple(item);
+       ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
+
        /* NO EREPORT(ERROR) from here till newroot op is logged */
        START_CRIT_SECTION();
 
@@ -1877,16 +1964,6 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
        metad->btm_fastroot = rootblknum;
        metad->btm_fastlevel = rootopaque->btpo.level;
 
-       /*
-        * Create downlink item for left page (old root).  Since this will be the
-        * first item in a non-leaf page, it implicitly has minus-infinity key
-        * value, so we need not store any actual key in it.
-        */
-       itemsz = sizeof(IndexTupleData);
-       new_item = (IndexTuple) palloc(itemsz);
-       new_item->t_info = itemsz;
-       ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
-
        /*
         * Insert the left page pointer into the new root page.  The root page is
         * the rightmost page on its level so there is no "high key" in it; the
@@ -1895,32 +1972,25 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
         * Note: we *must* insert the two items in item-number order, for the
         * benefit of _bt_restore_page().
         */
-       if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY,
+       if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
                                        false, false) == InvalidOffsetNumber)
                elog(PANIC, "failed to add leftkey to new root page"
                         " while splitting block %u of index \"%s\"",
                         BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
-       pfree(new_item);
-
-       /*
-        * Create downlink item for right page.  The key for it is obtained from
-        * the "high key" position in the left page.
-        */
-       itemid = PageGetItemId(lpage, P_HIKEY);
-       itemsz = ItemIdGetLength(itemid);
-       item = (IndexTuple) PageGetItem(lpage, itemid);
-       new_item = CopyIndexTuple(item);
-       ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
 
        /*
         * insert the right page pointer into the new root page.
         */
-       if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY,
+       if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
                                        false, false) == InvalidOffsetNumber)
                elog(PANIC, "failed to add rightkey to new root page"
                         " while splitting block %u of index \"%s\"",
                         BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
-       pfree(new_item);
+
+       /* Clear the incomplete-split flag in the left child */
+       Assert(P_INCOMPLETE_SPLIT(lopaque));
+       lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+       MarkBufferDirty(lbuf);
 
        MarkBufferDirty(rootbuf);
        MarkBufferDirty(metabuf);
@@ -1930,44 +2000,49 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
        {
                xl_btree_newroot xlrec;
                XLogRecPtr      recptr;
-               XLogRecData rdata[2];
+               xl_btree_metadata md;
 
-               xlrec.node = rel->rd_node;
                xlrec.rootblk = rootblknum;
                xlrec.level = metad->btm_level;
 
-               rdata[0].data = (char *) &xlrec;
-               rdata[0].len = SizeOfBtreeNewroot;
-               rdata[0].buffer = InvalidBuffer;
-               rdata[0].next = &(rdata[1]);
+               XLogBeginInsert();
+               XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
+
+               XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
+               XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+
+               md.root = rootblknum;
+               md.level = metad->btm_level;
+               md.fastroot = rootblknum;
+               md.fastlevel = metad->btm_level;
+
+               XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
 
                /*
                 * Direct access to page is not good but faster - we should implement
                 * some new func in page API.
                 */
-               rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
-               rdata[1].len = ((PageHeader) rootpage)->pd_special -
-                       ((PageHeader) rootpage)->pd_upper;
-               rdata[1].buffer = InvalidBuffer;
-               rdata[1].next = NULL;
+               XLogRegisterBufData(0,
+                                          (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
+                                                       ((PageHeader) rootpage)->pd_special -
+                                                       ((PageHeader) rootpage)->pd_upper);
 
-               recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
+               recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
 
+               PageSetLSN(lpage, recptr);
                PageSetLSN(rootpage, recptr);
-               PageSetTLI(rootpage, ThisTimeLineID);
                PageSetLSN(metapg, recptr);
-               PageSetTLI(metapg, ThisTimeLineID);
        }
 
        END_CRIT_SECTION();
 
-       /* send out relcache inval for metapage change */
-       if (!InRecovery)
-               CacheInvalidateRelcache(rel);
-
        /* done with metapage */
        _bt_relbuf(rel, metabuf);
 
+       pfree(left_item);
+       pfree(right_item);
+
        return rootbuf;
 }