]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/spgist/spgdoinsert.c
Avoid deadlocks during insertion into SP-GiST indexes.
[postgresql] / src / backend / access / spgist / spgdoinsert.c
index b3f8f6a231372fb1f4b6e2c1d42f124b2ca215cb..1fd331cbdfd1559ac5495fba9fe87c74a317a28f 100644 (file)
@@ -4,7 +4,7 @@
  *       implementation of insert algorithm
  *
  *
- * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
@@ -19,6 +19,7 @@
 #include "access/spgist_private.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "utils/rel.h"
 
 
 /*
@@ -307,13 +308,11 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
                recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
 
                /* update parent only if we actually changed it */
                if (xlrec.blknoParent != InvalidBlockNumber)
                {
                        PageSetLSN(parent->page, recptr);
-                       PageSetTLI(parent->page, ThisTimeLineID);
                }
        }
 
@@ -547,11 +546,8 @@ moveLeafs(Relation index, SpGistState *state,
                recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
                PageSetLSN(npage, recptr);
-               PageSetTLI(npage, ThisTimeLineID);
                PageSetLSN(parent->page, recptr);
-               PageSetTLI(parent->page, ThisTimeLineID);
        }
 
        END_CRIT_SECTION();
@@ -1400,7 +1396,6 @@ doPickSplit(Relation index, SpGistState *state,
                        Page            page = BufferGetPage(newLeafBuffer);
 
                        PageSetLSN(page, recptr);
-                       PageSetTLI(page, ThisTimeLineID);
                }
 
                if (saveCurrent.buffer != InvalidBuffer)
@@ -1408,16 +1403,13 @@ doPickSplit(Relation index, SpGistState *state,
                        Page            page = BufferGetPage(saveCurrent.buffer);
 
                        PageSetLSN(page, recptr);
-                       PageSetTLI(page, ThisTimeLineID);
                }
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
 
                if (parent->buffer != InvalidBuffer)
                {
                        PageSetLSN(parent->page, recptr);
-                       PageSetTLI(parent->page, ThisTimeLineID);
                }
        }
 
@@ -1556,7 +1548,6 @@ spgAddNodeAction(Relation index, SpGistState *state,
                        recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
 
                        PageSetLSN(current->page, recptr);
-                       PageSetTLI(current->page, ThisTimeLineID);
                }
 
                END_CRIT_SECTION();
@@ -1666,11 +1657,8 @@ spgAddNodeAction(Relation index, SpGistState *state,
 
                        /* we don't bother to check if any of these are redundant */
                        PageSetLSN(current->page, recptr);
-                       PageSetTLI(current->page, ThisTimeLineID);
                        PageSetLSN(parent->page, recptr);
-                       PageSetTLI(parent->page, ThisTimeLineID);
                        PageSetLSN(saveCurrent.page, recptr);
-                       PageSetTLI(saveCurrent.page, ThisTimeLineID);
                }
 
                END_CRIT_SECTION();
@@ -1830,12 +1818,10 @@ spgSplitNodeAction(Relation index, SpGistState *state,
                recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
 
                PageSetLSN(current->page, recptr);
-               PageSetTLI(current->page, ThisTimeLineID);
 
                if (newBuffer != InvalidBuffer)
                {
                        PageSetLSN(BufferGetPage(newBuffer), recptr);
-                       PageSetTLI(BufferGetPage(newBuffer), ThisTimeLineID);
                }
        }
 
@@ -1850,9 +1836,13 @@ spgSplitNodeAction(Relation index, SpGistState *state,
 }
 
 /*
- * Insert one item into the index
+ * Insert one item into the index.
+ *
+ * Returns true on success, false if we failed to complete the insertion
+ * because of conflict with a concurrent insert.  In the latter case,
+ * caller should re-call spgdoinsert() with the same args.
  */
-void
+bool
 spgdoinsert(Relation index, SpGistState *state,
                        ItemPointer heapPtr, Datum datum, bool isnull)
 {
@@ -1861,6 +1851,14 @@ spgdoinsert(Relation index, SpGistState *state,
        int                     leafSize;
        SPPageDesc      current,
                                parent;
+       FmgrInfo   *procinfo = NULL;
+
+       /*
+        * Look up FmgrInfo of the user-defined choose function once, to save
+        * cycles in the loop below.
+        */
+       if (!isnull)
+               procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
 
        /*
         * Since we don't use index_form_tuple in this AM, we have to make sure
@@ -1933,12 +1931,32 @@ spgdoinsert(Relation index, SpGistState *state,
                                                                &isNew);
                        current.blkno = BufferGetBlockNumber(current.buffer);
                }
-               else if (parent.buffer == InvalidBuffer ||
-                                current.blkno != parent.blkno)
+               else if (parent.buffer == InvalidBuffer)
                {
+                       /* we hold no parent-page lock, so no deadlock is possible */
                        current.buffer = ReadBuffer(index, current.blkno);
                        LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
                }
+               else if (current.blkno != parent.blkno)
+               {
+                       /* descend to a new child page */
+                       current.buffer = ReadBuffer(index, current.blkno);
+
+                       /*
+                        * Attempt to acquire lock on child page.  We must beware of
+                        * deadlock against another insertion process descending from that
+                        * page to our parent page (see README).  If we fail to get lock,
+                        * abandon the insertion and tell our caller to start over.  XXX
+                        * this could be improved; perhaps it'd be worth sleeping a bit
+                        * before giving up?
+                        */
+                       if (!ConditionalLockBuffer(current.buffer))
+                       {
+                               ReleaseBuffer(current.buffer);
+                               UnlockReleaseBuffer(parent.buffer);
+                               return false;
+                       }
+               }
                else
                {
                        /* inner tuple can be stored on the same page as parent one */
@@ -2007,7 +2025,6 @@ spgdoinsert(Relation index, SpGistState *state,
                        SpGistInnerTuple innerTuple;
                        spgChooseIn in;
                        spgChooseOut out;
-                       FmgrInfo   *procinfo;
 
                        /*
                         * spgAddNode and spgSplitTuple cases will loop back to here to
@@ -2035,7 +2052,6 @@ spgdoinsert(Relation index, SpGistState *state,
                        if (!isnull)
                        {
                                /* use user-defined choose method */
-                               procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
                                FunctionCall2Coll(procinfo,
                                                                  index->rd_indcollation[0],
                                                                  PointerGetDatum(&in),
@@ -2139,4 +2155,6 @@ spgdoinsert(Relation index, SpGistState *state,
                SpGistSetLastUsedPage(index, parent.buffer);
                UnlockReleaseBuffer(parent.buffer);
        }
+
+       return true;
 }