]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/spgist/spgdoinsert.c
Avoid deadlocks during insertion into SP-GiST indexes.
[postgresql] / src / backend / access / spgist / spgdoinsert.c
index 317feebde7eab4072355472d624f0d12e5917688..1fd331cbdfd1559ac5495fba9fe87c74a317a28f 100644 (file)
@@ -4,7 +4,7 @@
  *       implementation of insert algorithm
  *
  *
- * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
 #include "access/spgist_private.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "utils/rel.h"
 
 
 /*
  * SPPageDesc tracks all info about a page we are inserting into.  In some
  * situations it actually identifies a tuple, or even a specific node within
- * an inner tuple.  But any of the fields can be invalid.  If the buffer
+ * an inner tuple.     But any of the fields can be invalid.  If the buffer
  * field is valid, it implies we hold pin and exclusive lock on that buffer.
  * page pointer should be valid exactly when buffer is.
  */
@@ -129,8 +130,8 @@ spgPageIndexMultiDelete(SpGistState *state, Page page,
                                                int firststate, int reststate,
                                                BlockNumber blkno, OffsetNumber offnum)
 {
-       OffsetNumber    firstItem;
-       OffsetNumber   *sortednos;
+       OffsetNumber firstItem;
+       OffsetNumber *sortednos;
        SpGistDeadTuple tuple = NULL;
        int                     i;
 
@@ -155,8 +156,8 @@ spgPageIndexMultiDelete(SpGistState *state, Page page,
 
        for (i = 0; i < nitems; i++)
        {
-               OffsetNumber    itemno = sortednos[i];
-               int                             tupstate;
+               OffsetNumber itemno = sortednos[i];
+               int                     tupstate;
 
                tupstate = (itemno == firstItem) ? firststate : reststate;
                if (tuple == NULL || tuple->tupstate != tupstate)
@@ -200,7 +201,7 @@ saveNodeLink(Relation index, SPPageDesc *parent,
  */
 static void
 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
-                        SPPageDesc *current, SPPageDesc *parent, bool isNew)
+                  SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
 {
        XLogRecData rdata[4];
        spgxlogAddLeaf xlrec;
@@ -208,6 +209,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
        xlrec.node = index->rd_node;
        xlrec.blknoLeaf = current->blkno;
        xlrec.newPage = isNew;
+       xlrec.storesNulls = isNulls;
 
        /* these will be filled below as needed */
        xlrec.offnumLeaf = InvalidOffsetNumber;
@@ -224,12 +226,12 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
        START_CRIT_SECTION();
 
        if (current->offnum == InvalidOffsetNumber ||
-               current->blkno == SPGIST_HEAD_BLKNO)
+               SpGistBlockIsRoot(current->blkno))
        {
                /* Tuple is not part of a chain */
                leafTuple->nextOffset = InvalidOffsetNumber;
                current->offnum = SpGistPageAddNewItem(state, current->page,
-                                                                                          (Item) leafTuple, leafTuple->size,
+                                                                                  (Item) leafTuple, leafTuple->size,
                                                                                           NULL, false);
 
                xlrec.offnumLeaf = current->offnum;
@@ -249,9 +251,9 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
        else
        {
                /*
-                * Tuple must be inserted into existing chain.  We mustn't change
-                * the chain's head address, but we don't need to chase the entire
-                * chain to put the tuple at the end; we can insert it second.
+                * Tuple must be inserted into existing chain.  We mustn't change the
+                * chain's head address, but we don't need to chase the entire chain
+                * to put the tuple at the end; we can insert it second.
                 *
                 * Also, it's possible that the "chain" consists only of a DEAD tuple,
                 * in which case we should replace the DEAD tuple in-place.
@@ -260,7 +262,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
                OffsetNumber offnum;
 
                head = (SpGistLeafTuple) PageGetItem(current->page,
-                                                                                        PageGetItemId(current->page, current->offnum));
+                                                         PageGetItemId(current->page, current->offnum));
                if (head->tupstate == SPGIST_LIVE)
                {
                        leafTuple->nextOffset = head->nextOffset;
@@ -273,7 +275,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
                         * and set new second element
                         */
                        head = (SpGistLeafTuple) PageGetItem(current->page,
-                                                                                        PageGetItemId(current->page, current->offnum));
+                                                         PageGetItemId(current->page, current->offnum));
                        head->nextOffset = offnum;
 
                        xlrec.offnumLeaf = offnum;
@@ -306,13 +308,11 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
                recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
 
                /* update parent only if we actually changed it */
                if (xlrec.blknoParent != InvalidBlockNumber)
                {
                        PageSetLSN(parent->page, recptr);
-                       PageSetTLI(parent->page, ThisTimeLineID);
                }
        }
 
@@ -337,7 +337,7 @@ checkSplitConditions(Relation index, SpGistState *state,
                                n = 0,
                                totalSize = 0;
 
-       if (current->blkno == SPGIST_HEAD_BLKNO)
+       if (SpGistBlockIsRoot(current->blkno))
        {
                /* return impossible values to force split */
                *nToSplit = BLCKSZ;
@@ -386,7 +386,7 @@ checkSplitConditions(Relation index, SpGistState *state,
 static void
 moveLeafs(Relation index, SpGistState *state,
                  SPPageDesc *current, SPPageDesc *parent,
-                 SpGistLeafTuple newLeafTuple)
+                 SpGistLeafTuple newLeafTuple, bool isNulls)
 {
        int                     i,
                                nDelete,
@@ -451,7 +451,8 @@ moveLeafs(Relation index, SpGistState *state,
        }
 
        /* Find a leaf page that will hold them */
-       nbuf = SpGistGetBuffer(index, GBUF_LEAF, size, &xlrec.newPage);
+       nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
+                                                  size, &xlrec.newPage);
        npage = BufferGetPage(nbuf);
        nblkno = BufferGetBlockNumber(nbuf);
        Assert(nblkno != current->blkno);
@@ -464,6 +465,7 @@ moveLeafs(Relation index, SpGistState *state,
        xlrec.blknoDst = nblkno;
        xlrec.nMoves = nDelete;
        xlrec.replaceDead = replaceDead;
+       xlrec.storesNulls = isNulls;
 
        xlrec.blknoParent = parent->blkno;
        xlrec.offnumParent = parent->offnum;
@@ -480,7 +482,7 @@ moveLeafs(Relation index, SpGistState *state,
                for (i = 0; i < nDelete; i++)
                {
                        it = (SpGistLeafTuple) PageGetItem(current->page,
-                                                                                          PageGetItemId(current->page, toDelete[i]));
+                                                                 PageGetItemId(current->page, toDelete[i]));
                        Assert(it->tupstate == SPGIST_LIVE);
 
                        /*
@@ -513,12 +515,12 @@ moveLeafs(Relation index, SpGistState *state,
        leafptr += newLeafTuple->size;
 
        /*
-        * Now delete the old tuples, leaving a redirection pointer behind for
-        * the first one, unless we're doing an index build; in which case there
-        * can't be any concurrent scan so we need not provide a redirect.
+        * Now delete the old tuples, leaving a redirection pointer behind for the
+        * first one, unless we're doing an index build; in which case there can't
+        * be any concurrent scan so we need not provide a redirect.
         */
        spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
-                                                       state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
+                                          state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
                                                        SPGIST_PLACEHOLDER,
                                                        nblkno, r);
 
@@ -544,11 +546,8 @@ moveLeafs(Relation index, SpGistState *state,
                recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
                PageSetLSN(npage, recptr);
-               PageSetTLI(npage, ThisTimeLineID);
                PageSetLSN(parent->page, recptr);
-               PageSetTLI(parent->page, ThisTimeLineID);
        }
 
        END_CRIT_SECTION();
@@ -572,7 +571,7 @@ setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
        SpGistDeadTuple dt;
 
        dt = (SpGistDeadTuple) PageGetItem(current->page,
-                                                                          PageGetItemId(current->page, position));
+                                                                        PageGetItemId(current->page, position));
        Assert(dt->tupstate == SPGIST_REDIRECT);
        Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
        ItemPointerSet(&dt->pointer, blkno, offnum);
@@ -584,6 +583,8 @@ setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
  * If so, randomly divide the tuples into several nodes (all with the same
  * label) and return TRUE to select allTheSame mode for this inner tuple.
  *
+ * (This code is also used to forcibly select allTheSame mode for nulls.)
+ *
  * If we know that the leaf tuples wouldn't all fit on one page, then we
  * exclude the last tuple (which is the incoming new tuple that forced a split)
  * from the check to see if more than one node is used.  The reason for this
@@ -635,7 +636,7 @@ checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
        /* The opclass may not use node labels, but if it does, duplicate 'em */
        if (out->nodeLabels)
        {
-               Datum   theLabel = out->nodeLabels[theNode];
+               Datum           theLabel = out->nodeLabels[theNode];
 
                out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
                for (i = 0; i < out->nNodes; i++)
@@ -674,7 +675,8 @@ checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
 static bool
 doPickSplit(Relation index, SpGistState *state,
                        SPPageDesc *current, SPPageDesc *parent,
-                       SpGistLeafTuple newLeafTuple, int level, bool isNew)
+                       SpGistLeafTuple newLeafTuple,
+                       int level, bool isNulls, bool isNew)
 {
        bool            insertedNew = false;
        spgPickSplitIn in;
@@ -733,16 +735,23 @@ doPickSplit(Relation index, SpGistState *state,
         * also, count up the amount of space that will be freed from current.
         * (Note that in the non-root case, we won't actually delete the old
         * tuples, only replace them with redirects or placeholders.)
+        *
+        * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
+        * page.  For a pass-by-value data type we will fetch a word that must
+        * exist even though it may contain garbage (because of the fact that leaf
+        * tuples must have size at least SGDTSIZE).  For a pass-by-reference type
+        * we are just computing a pointer that isn't going to get dereferenced.
+        * So it's not worth guarding the calls with isNulls checks.
         */
        nToInsert = 0;
        nToDelete = 0;
        spaceToDelete = 0;
-       if (current->blkno == SPGIST_HEAD_BLKNO)
+       if (SpGistBlockIsRoot(current->blkno))
        {
                /*
                 * We are splitting the root (which up to now is also a leaf page).
-                * Its tuples are not linked, so scan sequentially to get them all.
-                * We ignore the original value of current->offnum.
+                * Its tuples are not linked, so scan sequentially to get them all. We
+                * ignore the original value of current->offnum.
                 */
                for (i = FirstOffsetNumber; i <= max; i++)
                {
@@ -760,7 +769,7 @@ doPickSplit(Relation index, SpGistState *state,
                                /* we will delete the tuple altogether, so count full space */
                                spaceToDelete += it->size + sizeof(ItemIdData);
                        }
-                       else                            /* tuples on root should be live */
+                       else    /* tuples on root should be live */
                                elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
                }
        }
@@ -805,40 +814,67 @@ doPickSplit(Relation index, SpGistState *state,
 
        /*
         * We may not actually insert new tuple because another picksplit may be
-        * necessary due to too large value, but we will try to to allocate enough
+        * necessary due to too large value, but we will try to allocate enough
         * space to include it; and in any case it has to be included in the input
-        * for the picksplit function.  So don't increment nToInsert yet.
+        * for the picksplit function.  So don't increment nToInsert yet.
         */
        in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
        heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
        in.nTuples++;
 
-       /*
-        * Perform split using user-defined method.
-        */
        memset(&out, 0, sizeof(out));
 
-       procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
-       FunctionCall2Coll(procinfo,
-                                         index->rd_indcollation[0],
-                                         PointerGetDatum(&in),
-                                         PointerGetDatum(&out));
+       if (!isNulls)
+       {
+               /*
+                * Perform split using user-defined method.
+                */
+               procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
+               FunctionCall2Coll(procinfo,
+                                                 index->rd_indcollation[0],
+                                                 PointerGetDatum(&in),
+                                                 PointerGetDatum(&out));
 
-       /*
-        * Form new leaf tuples and count up the total space needed.
-        */
-       totalLeafSizes = 0;
-       for (i = 0; i < in.nTuples; i++)
+               /*
+                * Form new leaf tuples and count up the total space needed.
+                */
+               totalLeafSizes = 0;
+               for (i = 0; i < in.nTuples; i++)
+               {
+                       newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
+                                                                                  out.leafTupleDatums[i],
+                                                                                  false);
+                       totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+               }
+       }
+       else
        {
-               newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
-                                                                          out.leafTupleDatums[i]);
-               totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+               /*
+                * Perform dummy split that puts all tuples into one node.
+                * checkAllTheSame will override this and force allTheSame mode.
+                */
+               out.hasPrefix = false;
+               out.nNodes = 1;
+               out.nodeLabels = NULL;
+               out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
+
+               /*
+                * Form new leaf tuples and count up the total space needed.
+                */
+               totalLeafSizes = 0;
+               for (i = 0; i < in.nTuples; i++)
+               {
+                       newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
+                                                                                  (Datum) 0,
+                                                                                  true);
+                       totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+               }
        }
 
        /*
         * Check to see if the picksplit function failed to separate the values,
         * ie, it put them all into the same child node.  If so, select allTheSame
-        * mode and create a random split instead.  See comments for
+        * mode and create a random split instead.      See comments for
         * checkAllTheSame as to why we need to know if the new leaf tuples could
         * fit on one page.
         */
@@ -872,11 +908,11 @@ doPickSplit(Relation index, SpGistState *state,
        for (i = 0; i < out.nNodes; i++)
        {
                Datum           label = (Datum) 0;
-               bool            isnull = (out.nodeLabels == NULL);
+               bool            labelisnull = (out.nodeLabels == NULL);
 
-               if (!isnull)
+               if (!labelisnull)
                        label = out.nodeLabels[i];
-               nodes[i] = spgFormNodeTuple(state, label, isnull);
+               nodes[i] = spgFormNodeTuple(state, label, labelisnull);
        }
        innerTuple = spgFormInnerTuple(state,
                                                                   out.hasPrefix, out.prefixDatum,
@@ -884,8 +920,8 @@ doPickSplit(Relation index, SpGistState *state,
        innerTuple->allTheSame = allTheSame;
 
        /*
-        * Update nodes[] array to point into the newly formed innerTuple, so
-        * that we can adjust their downlinks below.
+        * Update nodes[] array to point into the newly formed innerTuple, so that
+        * we can adjust their downlinks below.
         */
        SGITITERATE(innerTuple, i, node)
        {
@@ -904,17 +940,17 @@ doPickSplit(Relation index, SpGistState *state,
        }
 
        /*
-        * To perform the split, we must insert a new inner tuple, which can't
-        * go on a leaf page; and unless we are splitting the root page, we
-        * must then update the parent tuple's downlink to point to the inner
-        * tuple.  If there is room, we'll put the new inner tuple on the same
-        * page as the parent tuple, otherwise we need another non-leaf buffer.
-        * But if the parent page is the root, we can't add the new inner tuple
-        * there, because the root page must have only one inner tuple.
+        * To perform the split, we must insert a new inner tuple, which can't go
+        * on a leaf page; and unless we are splitting the root page, we must then
+        * update the parent tuple's downlink to point to the inner tuple.  If
+        * there is room, we'll put the new inner tuple on the same page as the
+        * parent tuple, otherwise we need another non-leaf buffer. But if the
+        * parent page is the root, we can't add the new inner tuple there,
+        * because the root page must have only one inner tuple.
         */
        xlrec.initInner = false;
        if (parent->buffer != InvalidBuffer &&
-               parent->blkno != SPGIST_HEAD_BLKNO &&
+               !SpGistBlockIsRoot(parent->blkno) &&
                (SpGistPageGetFreeSpace(parent->page, 1) >=
                 innerTuple->size + sizeof(ItemIdData)))
        {
@@ -925,8 +961,9 @@ doPickSplit(Relation index, SpGistState *state,
        {
                /* Send tuple to page with next triple parity (see README) */
                newInnerBuffer = SpGistGetBuffer(index,
-                                                                                GBUF_INNER_PARITY(parent->blkno + 1),
-                                                                                innerTuple->size + sizeof(ItemIdData),
+                                                                          GBUF_INNER_PARITY(parent->blkno + 1) |
+                                                                                (isNulls ? GBUF_NULLS : 0),
+                                                                          innerTuple->size + sizeof(ItemIdData),
                                                                                 &xlrec.initInner);
        }
        else
@@ -935,30 +972,29 @@ doPickSplit(Relation index, SpGistState *state,
                newInnerBuffer = InvalidBuffer;
        }
 
-       /*----------
-        * Because a WAL record can't involve more than four buffers, we can
-        * only afford to deal with two leaf pages in each picksplit action,
-        * ie the current page and at most one other.
+       /*
+        * Because a WAL record can't involve more than four buffers, we can only
+        * afford to deal with two leaf pages in each picksplit action, ie the
+        * current page and at most one other.
         *
-        * The new leaf tuples converted from the existing ones should require
-        * the same or less space, and therefore should all fit onto one page
+        * The new leaf tuples converted from the existing ones should require the
+        * same or less space, and therefore should all fit onto one page
         * (although that's not necessarily the current page, since we can't
         * delete the old tuples but only replace them with placeholders).
-        * However, the incoming new tuple might not also fit, in which case
-        * we might need another picksplit cycle to reduce it some more.
+        * However, the incoming new tuple might not also fit, in which case we
+        * might need another picksplit cycle to reduce it some more.
         *
-        * If there's not room to put everything back onto the current page,
-        * then we decide on a per-node basis which tuples go to the new page.
-        * (We do it like that because leaf tuple chains can't cross pages,
-        * so we must place all leaf tuples belonging to the same parent node
-        * on the same page.)
+        * If there's not room to put everything back onto the current page, then
+        * we decide on a per-node basis which tuples go to the new page. (We do
+        * it like that because leaf tuple chains can't cross pages, so we must
+        * place all leaf tuples belonging to the same parent node on the same
+        * page.)
         *
         * If we are splitting the root page (turning it from a leaf page into an
         * inner page), then no leaf tuples can go back to the current page; they
         * must all go somewhere else.
-        *----------
         */
-       if (current->blkno != SPGIST_HEAD_BLKNO)
+       if (!SpGistBlockIsRoot(current->blkno))
                currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
        else
                currentFreeSpace = 0;   /* prevent assigning any tuples to current */
@@ -996,12 +1032,14 @@ doPickSplit(Relation index, SpGistState *state,
                int                     curspace;
                int                     newspace;
 
-               newLeafBuffer = SpGistGetBuffer(index, GBUF_LEAF,
+               newLeafBuffer = SpGistGetBuffer(index,
+                                                                         GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
                                                                                Min(totalLeafSizes,
                                                                                        SPGIST_PAGE_CAPACITY),
                                                                                &xlrec.initDest);
+
                /*
-                * Attempt to assign node groups to the two pages.  We might fail to
+                * Attempt to assign node groups to the two pages.      We might fail to
                 * do so, even if totalLeafSizes is less than the available space,
                 * because we can't split a group across pages.
                 */
@@ -1013,12 +1051,12 @@ doPickSplit(Relation index, SpGistState *state,
                {
                        if (leafSizes[i] <= curspace)
                        {
-                               nodePageSelect[i] = 0; /* signifies current page */
+                               nodePageSelect[i] = 0;  /* signifies current page */
                                curspace -= leafSizes[i];
                        }
                        else
                        {
-                               nodePageSelect[i] = 1; /* signifies new leaf page */
+                               nodePageSelect[i] = 1;  /* signifies new leaf page */
                                newspace -= leafSizes[i];
                        }
                }
@@ -1034,7 +1072,7 @@ doPickSplit(Relation index, SpGistState *state,
                else if (includeNew)
                {
                        /* We must exclude the new leaf tuple from the split */
-                       int             nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
+                       int                     nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
 
                        leafSizes[nodeOfNewTuple] -=
                                newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
@@ -1046,12 +1084,12 @@ doPickSplit(Relation index, SpGistState *state,
                        {
                                if (leafSizes[i] <= curspace)
                                {
-                                       nodePageSelect[i] = 0; /* signifies current page */
+                                       nodePageSelect[i] = 0;          /* signifies current page */
                                        curspace -= leafSizes[i];
                                }
                                else
                                {
-                                       nodePageSelect[i] = 1; /* signifies new leaf page */
+                                       nodePageSelect[i] = 1;          /* signifies new leaf page */
                                        newspace -= leafSizes[i];
                                }
                        }
@@ -1076,6 +1114,7 @@ doPickSplit(Relation index, SpGistState *state,
        xlrec.blknoDest = InvalidBlockNumber;
        xlrec.nDelete = 0;
        xlrec.initSrc = isNew;
+       xlrec.storesNulls = isNulls;
 
        leafdata = leafptr = (char *) palloc(totalLeafSizes);
 
@@ -1091,7 +1130,7 @@ doPickSplit(Relation index, SpGistState *state,
         * the root; in that case there's no need because we'll re-init the page
         * below.  We do this first to make room for reinserting new leaf tuples.
         */
-       if (current->blkno != SPGIST_HEAD_BLKNO)
+       if (!SpGistBlockIsRoot(current->blkno))
        {
                /*
                 * Init buffer instead of deleting individual tuples, but only if
@@ -1102,7 +1141,8 @@ doPickSplit(Relation index, SpGistState *state,
                        nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
                        PageGetMaxOffsetNumber(current->page))
                {
-                       SpGistInitBuffer(current->buffer, SPGIST_LEAF);
+                       SpGistInitBuffer(current->buffer,
+                                                        SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
                        xlrec.initSrc = true;
                }
                else if (isNew)
@@ -1161,7 +1201,7 @@ doPickSplit(Relation index, SpGistState *state,
        for (i = 0; i < nToInsert; i++)
        {
                SpGistLeafTuple it = newLeafs[i];
-               Buffer  leafBuffer;
+               Buffer          leafBuffer;
                BlockNumber leafBlock;
                OffsetNumber newoffset;
 
@@ -1317,10 +1357,10 @@ doPickSplit(Relation index, SpGistState *state,
                 * Splitting root page, which was a leaf but now becomes inner page
                 * (and so "current" continues to point at it)
                 */
-               Assert(current->blkno == SPGIST_HEAD_BLKNO);
+               Assert(SpGistBlockIsRoot(current->blkno));
                Assert(redirectTuplePos == InvalidOffsetNumber);
 
-               SpGistInitBuffer(current->buffer, 0);
+               SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
                xlrec.initInner = true;
 
                xlrec.blknoInner = current->blkno;
@@ -1356,7 +1396,6 @@ doPickSplit(Relation index, SpGistState *state,
                        Page            page = BufferGetPage(newLeafBuffer);
 
                        PageSetLSN(page, recptr);
-                       PageSetTLI(page, ThisTimeLineID);
                }
 
                if (saveCurrent.buffer != InvalidBuffer)
@@ -1364,16 +1403,13 @@ doPickSplit(Relation index, SpGistState *state,
                        Page            page = BufferGetPage(saveCurrent.buffer);
 
                        PageSetLSN(page, recptr);
-                       PageSetTLI(page, ThisTimeLineID);
                }
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
 
                if (parent->buffer != InvalidBuffer)
                {
                        PageSetLSN(parent->page, recptr);
-                       PageSetTLI(parent->page, ThisTimeLineID);
                }
        }
 
@@ -1461,6 +1497,9 @@ spgAddNodeAction(Relation index, SpGistState *state,
        XLogRecData rdata[5];
        spgxlogAddNode xlrec;
 
+       /* Should not be applied to nulls */
+       Assert(!SpGistPageStoresNulls(current->page));
+
        /* Construct new inner tuple with additional node */
        newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
 
@@ -1509,7 +1548,6 @@ spgAddNodeAction(Relation index, SpGistState *state,
                        recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
 
                        PageSetLSN(current->page, recptr);
-                       PageSetTLI(current->page, ThisTimeLineID);
                }
 
                END_CRIT_SECTION();
@@ -1527,7 +1565,7 @@ spgAddNodeAction(Relation index, SpGistState *state,
                 * allow only one inner tuple on the root page, and spgFormInnerTuple
                 * always checks that inner tuples don't exceed the size of a page.
                 */
-               if (current->blkno == SPGIST_HEAD_BLKNO)
+               if (SpGistBlockIsRoot(current->blkno))
                        elog(ERROR, "cannot enlarge root tuple any more");
                Assert(parent->buffer != InvalidBuffer);
 
@@ -1538,12 +1576,12 @@ spgAddNodeAction(Relation index, SpGistState *state,
                xlrec.nodeI = parent->node;
 
                /*
-                * obtain new buffer with the same parity as current, since it will
-                * be a child of same parent tuple
+                * obtain new buffer with the same parity as current, since it will be
+                * a child of same parent tuple
                 */
                current->buffer = SpGistGetBuffer(index,
                                                                                  GBUF_INNER_PARITY(current->blkno),
-                                                                                 newInnerTuple->size + sizeof(ItemIdData),
+                                                                       newInnerTuple->size + sizeof(ItemIdData),
                                                                                  &xlrec.newPage);
                current->blkno = BufferGetBlockNumber(current->buffer);
                current->page = BufferGetPage(current->buffer);
@@ -1551,15 +1589,15 @@ spgAddNodeAction(Relation index, SpGistState *state,
                xlrec.blknoNew = current->blkno;
 
                /*
-                * Let's just make real sure new current isn't same as old.  Right
-                * now that's impossible, but if SpGistGetBuffer ever got smart enough
-                * to delete placeholder tuples before checking space, maybe it
-                * wouldn't be impossible.  The case would appear to work except that
-                * WAL replay would be subtly wrong, so I think a mere assert isn't
-                * enough here.
+                * Let's just make real sure new current isn't same as old.  Right now
+                * that's impossible, but if SpGistGetBuffer ever got smart enough to
+                * delete placeholder tuples before checking space, maybe it wouldn't
+                * be impossible.  The case would appear to work except that WAL
+                * replay would be subtly wrong, so I think a mere assert isn't enough
+                * here.
                 */
-                if (xlrec.blknoNew == xlrec.blkno)
-                        elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
+               if (xlrec.blknoNew == xlrec.blkno)
+                       elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
 
                /*
                 * New current and parent buffer will both be modified; but note that
@@ -1619,11 +1657,8 @@ spgAddNodeAction(Relation index, SpGistState *state,
 
                        /* we don't bother to check if any of these are redundant */
                        PageSetLSN(current->page, recptr);
-                       PageSetTLI(current->page, ThisTimeLineID);
                        PageSetLSN(parent->page, recptr);
-                       PageSetTLI(parent->page, ThisTimeLineID);
                        PageSetLSN(saveCurrent.page, recptr);
-                       PageSetTLI(saveCurrent.page, ThisTimeLineID);
                }
 
                END_CRIT_SECTION();
@@ -1657,10 +1692,13 @@ spgSplitNodeAction(Relation index, SpGistState *state,
        spgxlogSplitTuple xlrec;
        Buffer          newBuffer = InvalidBuffer;
 
+       /* Should not be applied to nulls */
+       Assert(!SpGistPageStoresNulls(current->page));
+
        /*
-        * Construct new prefix tuple, containing a single node with the
-        * specified label.  (We'll update the node's downlink to point to the
-        * new postfix tuple, below.)
+        * Construct new prefix tuple, containing a single node with the specified
+        * label.  (We'll update the node's downlink to point to the new postfix
+        * tuple, below.)
         */
        node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false);
 
@@ -1709,7 +1747,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
         * For the space calculation, note that prefixTuple replaces innerTuple
         * but postfixTuple will be a new entry.
         */
-       if (current->blkno == SPGIST_HEAD_BLKNO ||
+       if (SpGistBlockIsRoot(current->blkno) ||
                SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
                prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
        {
@@ -1780,12 +1818,10 @@ spgSplitNodeAction(Relation index, SpGistState *state,
                recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
 
                if (newBuffer != InvalidBuffer)
                {
                        PageSetLSN(BufferGetPage(newBuffer), recptr);
-                       PageSetTLI(BufferGetPage(newBuffer), ThisTimeLineID);
                }
        }
 
@@ -1800,24 +1836,36 @@ spgSplitNodeAction(Relation index, SpGistState *state,
 }
 
 /*
- * Insert one item into the index
+ * Insert one item into the index.
+ *
+ * Returns true on success, false if we failed to complete the insertion
+ * because of conflict with a concurrent insert.  In the latter case,
+ * caller should re-call spgdoinsert() with the same args.
  */
-void
+bool
 spgdoinsert(Relation index, SpGistState *state,
-                       ItemPointer heapPtr, Datum datum)
+                       ItemPointer heapPtr, Datum datum, bool isnull)
 {
        int                     level = 0;
        Datum           leafDatum;
        int                     leafSize;
        SPPageDesc      current,
                                parent;
+       FmgrInfo   *procinfo = NULL;
+
+       /*
+        * Look up FmgrInfo of the user-defined choose function once, to save
+        * cycles in the loop below.
+        */
+       if (!isnull)
+               procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
 
        /*
         * Since we don't use index_form_tuple in this AM, we have to make sure
         * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
         * that.
         */
-       if (state->attType.attlen == -1)
+       if (!isnull && state->attType.attlen == -1)
                datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
 
        leafDatum = datum;
@@ -1828,20 +1876,23 @@ spgdoinsert(Relation index, SpGistState *state,
         * If it isn't gonna fit, and the opclass can't reduce the datum size by
         * suffixing, bail out now rather than getting into an endless loop.
         */
-       leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
-               SpGistGetTypeSize(&state->attType, leafDatum);
+       if (!isnull)
+               leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
+                       SpGistGetTypeSize(&state->attType, leafDatum);
+       else
+               leafSize = SGDTSIZE + sizeof(ItemIdData);
 
        if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
                ereport(ERROR,
                                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                        errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
                                   (unsigned long) (leafSize - sizeof(ItemIdData)),
-                                  (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
+                                (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
                                   RelationGetRelationName(index)),
-                 errhint("Values larger than a buffer page cannot be indexed.")));
+                       errhint("Values larger than a buffer page cannot be indexed.")));
 
-       /* Initialize "current" to the root page */
-       current.blkno = SPGIST_HEAD_BLKNO;
+       /* Initialize "current" to the appropriate root page */
+       current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
        current.buffer = InvalidBuffer;
        current.page = NULL;
        current.offnum = FirstOffsetNumber;
@@ -1868,23 +1919,44 @@ spgdoinsert(Relation index, SpGistState *state,
                if (current.blkno == InvalidBlockNumber)
                {
                        /*
-                        * Create a leaf page.  If leafSize is too large to fit on a page,
+                        * Create a leaf page.  If leafSize is too large to fit on a page,
                         * we won't actually use the page yet, but it simplifies the API
                         * for doPickSplit to always have a leaf page at hand; so just
                         * quietly limit our request to a page size.
                         */
-                       current.buffer = SpGistGetBuffer(index, GBUF_LEAF,
-                                                                                        Min(leafSize,
-                                                                                                SPGIST_PAGE_CAPACITY),
-                                                                                        &isNew);
+                       current.buffer =
+                               SpGistGetBuffer(index,
+                                                               GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
+                                                               Min(leafSize, SPGIST_PAGE_CAPACITY),
+                                                               &isNew);
                        current.blkno = BufferGetBlockNumber(current.buffer);
                }
-               else if (parent.buffer == InvalidBuffer ||
-                                current.blkno != parent.blkno)
+               else if (parent.buffer == InvalidBuffer)
                {
+                       /* we hold no parent-page lock, so no deadlock is possible */
                        current.buffer = ReadBuffer(index, current.blkno);
                        LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
                }
+               else if (current.blkno != parent.blkno)
+               {
+                       /* descend to a new child page */
+                       current.buffer = ReadBuffer(index, current.blkno);
+
+                       /*
+                        * Attempt to acquire lock on child page.  We must beware of
+                        * deadlock against another insertion process descending from that
+                        * page to our parent page (see README).  If we fail to get lock,
+                        * abandon the insertion and tell our caller to start over.  XXX
+                        * this could be improved; perhaps it'd be worth sleeping a bit
+                        * before giving up?
+                        */
+                       if (!ConditionalLockBuffer(current.buffer))
+                       {
+                               ReleaseBuffer(current.buffer);
+                               UnlockReleaseBuffer(parent.buffer);
+                               return false;
+                       }
+               }
                else
                {
                        /* inner tuple can be stored on the same page as parent one */
@@ -1892,24 +1964,30 @@ spgdoinsert(Relation index, SpGistState *state,
                }
                current.page = BufferGetPage(current.buffer);
 
+               /* should not arrive at a page of the wrong type */
+               if (isnull ? !SpGistPageStoresNulls(current.page) :
+                       SpGistPageStoresNulls(current.page))
+                       elog(ERROR, "SPGiST index page %u has wrong nulls flag",
+                                current.blkno);
+
                if (SpGistPageIsLeaf(current.page))
                {
                        SpGistLeafTuple leafTuple;
                        int                     nToSplit,
                                                sizeToSplit;
 
-                       leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum);
+                       leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
                        if (leafTuple->size + sizeof(ItemIdData) <=
                                SpGistPageGetFreeSpace(current.page, 1))
                        {
                                /* it fits on page, so insert it and we're done */
                                addLeafTuple(index, state, leafTuple,
-                                                        &current, &parent, isNew);
+                                                        &current, &parent, isnull, isNew);
                                break;
                        }
                        else if ((sizeToSplit =
                                          checkSplitConditions(index, state, &current,
-                                                                                  &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
+                                                                       &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
                                         nToSplit < 64 &&
                                         leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
                        {
@@ -1918,14 +1996,14 @@ spgdoinsert(Relation index, SpGistState *state,
                                 * chain to another leaf page rather than splitting it.
                                 */
                                Assert(!isNew);
-                               moveLeafs(index, state, &current, &parent, leafTuple);
+                               moveLeafs(index, state, &current, &parent, leafTuple, isnull);
                                break;                  /* we're done */
                        }
                        else
                        {
                                /* picksplit */
                                if (doPickSplit(index, state, &current, &parent,
-                                                               leafTuple, level, isNew))
+                                                               leafTuple, level, isnull, isNew))
                                        break;          /* doPickSplit installed new tuples */
 
                                /* leaf tuple will not be inserted yet */
@@ -1947,7 +2025,6 @@ spgdoinsert(Relation index, SpGistState *state,
                        SpGistInnerTuple innerTuple;
                        spgChooseIn in;
                        spgChooseOut out;
-                       FmgrInfo   *procinfo;
 
                        /*
                         * spgAddNode and spgSplitTuple cases will loop back to here to
@@ -1972,11 +2049,19 @@ spgdoinsert(Relation index, SpGistState *state,
 
                        memset(&out, 0, sizeof(out));
 
-                       procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
-                       FunctionCall2Coll(procinfo,
-                                                         index->rd_indcollation[0],
-                                                         PointerGetDatum(&in),
-                                                         PointerGetDatum(&out));
+                       if (!isnull)
+                       {
+                               /* use user-defined choose method */
+                               FunctionCall2Coll(procinfo,
+                                                                 index->rd_indcollation[0],
+                                                                 PointerGetDatum(&in),
+                                                                 PointerGetDatum(&out));
+                       }
+                       else
+                       {
+                               /* force "match" action (to insert to random subnode) */
+                               out.resultType = spgMatchNode;
+                       }
 
                        if (innerTuple->allTheSame)
                        {
@@ -2001,13 +2086,16 @@ spgdoinsert(Relation index, SpGistState *state,
                                        /* Adjust level as per opclass request */
                                        level += out.result.matchNode.levelAdd;
                                        /* Replace leafDatum and recompute leafSize */
-                                       leafDatum = out.result.matchNode.restDatum;
-                                       leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
-                                               SpGistGetTypeSize(&state->attType, leafDatum);
+                                       if (!isnull)
+                                       {
+                                               leafDatum = out.result.matchNode.restDatum;
+                                               leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
+                                                       SpGistGetTypeSize(&state->attType, leafDatum);
+                                       }
 
                                        /*
-                                        * Loop around and attempt to insert the new leafDatum
-                                        * at "current" (which might reference an existing child
+                                        * Loop around and attempt to insert the new leafDatum at
+                                        * "current" (which might reference an existing child
                                         * tuple, or might be invalid to force us to find a new
                                         * page for the tuple).
                                         *
@@ -2031,8 +2119,8 @@ spgdoinsert(Relation index, SpGistState *state,
                                                                         out.result.addNode.nodeLabel);
 
                                        /*
-                                        * Retry insertion into the enlarged node.  We assume
-                                        * that we'll get a MatchNode result this time.
+                                        * Retry insertion into the enlarged node.      We assume that
+                                        * we'll get a MatchNode result this time.
                                         */
                                        goto process_inner_tuple;
                                        break;
@@ -2067,4 +2155,6 @@ spgdoinsert(Relation index, SpGistState *state,
                SpGistSetLastUsedPage(index, parent.buffer);
                UnlockReleaseBuffer(parent.buffer);
        }
+
+       return true;
 }