]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/spgist/spgdoinsert.c
Avoid deadlocks during insertion into SP-GiST indexes.
[postgresql] / src / backend / access / spgist / spgdoinsert.c
index 36e57ba00430c84ce16a9d7c3df72d371beb9d23..1fd331cbdfd1559ac5495fba9fe87c74a317a28f 100644 (file)
@@ -4,7 +4,7 @@
  *       implementation of insert algorithm
  *
  *
- * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
@@ -308,13 +308,11 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
                recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
 
                /* update parent only if we actually changed it */
                if (xlrec.blknoParent != InvalidBlockNumber)
                {
                        PageSetLSN(parent->page, recptr);
-                       PageSetTLI(parent->page, ThisTimeLineID);
                }
        }
 
@@ -548,11 +546,8 @@ moveLeafs(Relation index, SpGistState *state,
                recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
                PageSetLSN(npage, recptr);
-               PageSetTLI(npage, ThisTimeLineID);
                PageSetLSN(parent->page, recptr);
-               PageSetTLI(parent->page, ThisTimeLineID);
        }
 
        END_CRIT_SECTION();
@@ -1401,7 +1396,6 @@ doPickSplit(Relation index, SpGistState *state,
                        Page            page = BufferGetPage(newLeafBuffer);
 
                        PageSetLSN(page, recptr);
-                       PageSetTLI(page, ThisTimeLineID);
                }
 
                if (saveCurrent.buffer != InvalidBuffer)
@@ -1409,16 +1403,13 @@ doPickSplit(Relation index, SpGistState *state,
                        Page            page = BufferGetPage(saveCurrent.buffer);
 
                        PageSetLSN(page, recptr);
-                       PageSetTLI(page, ThisTimeLineID);
                }
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
 
                if (parent->buffer != InvalidBuffer)
                {
                        PageSetLSN(parent->page, recptr);
-                       PageSetTLI(parent->page, ThisTimeLineID);
                }
        }
 
@@ -1557,7 +1548,6 @@ spgAddNodeAction(Relation index, SpGistState *state,
                        recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
 
                        PageSetLSN(current->page, recptr);
-                       PageSetTLI(current->page, ThisTimeLineID);
                }
 
                END_CRIT_SECTION();
@@ -1667,11 +1657,8 @@ spgAddNodeAction(Relation index, SpGistState *state,
 
                        /* we don't bother to check if any of these are redundant */
                        PageSetLSN(current->page, recptr);
-                       PageSetTLI(current->page, ThisTimeLineID);
                        PageSetLSN(parent->page, recptr);
-                       PageSetTLI(parent->page, ThisTimeLineID);
                        PageSetLSN(saveCurrent.page, recptr);
-                       PageSetTLI(saveCurrent.page, ThisTimeLineID);
                }
 
                END_CRIT_SECTION();
@@ -1831,12 +1818,10 @@ spgSplitNodeAction(Relation index, SpGistState *state,
                recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
 
                if (newBuffer != InvalidBuffer)
                {
                        PageSetLSN(BufferGetPage(newBuffer), recptr);
-                       PageSetTLI(BufferGetPage(newBuffer), ThisTimeLineID);
                }
        }
 
@@ -1851,9 +1836,13 @@ spgSplitNodeAction(Relation index, SpGistState *state,
 }
 
 /*
- * Insert one item into the index
+ * Insert one item into the index.
+ *
+ * Returns true on success, false if we failed to complete the insertion
+ * because of conflict with a concurrent insert.  In the latter case,
+ * caller should re-call spgdoinsert() with the same args.
  */
-void
+bool
 spgdoinsert(Relation index, SpGistState *state,
                        ItemPointer heapPtr, Datum datum, bool isnull)
 {
@@ -1942,12 +1931,32 @@ spgdoinsert(Relation index, SpGistState *state,
                                                                &isNew);
                        current.blkno = BufferGetBlockNumber(current.buffer);
                }
-               else if (parent.buffer == InvalidBuffer ||
-                                current.blkno != parent.blkno)
+               else if (parent.buffer == InvalidBuffer)
                {
+                       /* we hold no parent-page lock, so no deadlock is possible */
                        current.buffer = ReadBuffer(index, current.blkno);
                        LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
                }
+               else if (current.blkno != parent.blkno)
+               {
+                       /* descend to a new child page */
+                       current.buffer = ReadBuffer(index, current.blkno);
+
+                       /*
+                        * Attempt to acquire lock on child page.  We must beware of
+                        * deadlock against another insertion process descending from that
+                        * page to our parent page (see README).  If we fail to get lock,
+                        * abandon the insertion and tell our caller to start over.  XXX
+                        * this could be improved; perhaps it'd be worth sleeping a bit
+                        * before giving up?
+                        */
+                       if (!ConditionalLockBuffer(current.buffer))
+                       {
+                               ReleaseBuffer(current.buffer);
+                               UnlockReleaseBuffer(parent.buffer);
+                               return false;
+                       }
+               }
                else
                {
                        /* inner tuple can be stored on the same page as parent one */
@@ -2146,4 +2155,6 @@ spgdoinsert(Relation index, SpGistState *state,
                SpGistSetLastUsedPage(index, parent.buffer);
                UnlockReleaseBuffer(parent.buffer);
        }
+
+       return true;
 }