* implementation of insert algorithm
*
*
- * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
#include "access/spgist_private.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
+#include "utils/rel.h"
/*
* SPPageDesc tracks all info about a page we are inserting into. In some
* situations it actually identifies a tuple, or even a specific node within
- * an inner tuple. But any of the fields can be invalid. If the buffer
+ * an inner tuple. But any of the fields can be invalid. If the buffer
* field is valid, it implies we hold pin and exclusive lock on that buffer.
* page pointer should be valid exactly when buffer is.
*/
int firststate, int reststate,
BlockNumber blkno, OffsetNumber offnum)
{
- OffsetNumber firstItem;
- OffsetNumber *sortednos;
+ OffsetNumber firstItem;
+ OffsetNumber *sortednos;
SpGistDeadTuple tuple = NULL;
int i;
for (i = 0; i < nitems; i++)
{
- OffsetNumber itemno = sortednos[i];
- int tupstate;
+ OffsetNumber itemno = sortednos[i];
+ int tupstate;
tupstate = (itemno == firstItem) ? firststate : reststate;
if (tuple == NULL || tuple->tupstate != tupstate)
*/
static void
addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
- SPPageDesc *current, SPPageDesc *parent, bool isNew)
+ SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
{
XLogRecData rdata[4];
spgxlogAddLeaf xlrec;
xlrec.node = index->rd_node;
xlrec.blknoLeaf = current->blkno;
xlrec.newPage = isNew;
+ xlrec.storesNulls = isNulls;
/* these will be filled below as needed */
xlrec.offnumLeaf = InvalidOffsetNumber;
START_CRIT_SECTION();
if (current->offnum == InvalidOffsetNumber ||
- current->blkno == SPGIST_HEAD_BLKNO)
+ SpGistBlockIsRoot(current->blkno))
{
/* Tuple is not part of a chain */
leafTuple->nextOffset = InvalidOffsetNumber;
current->offnum = SpGistPageAddNewItem(state, current->page,
- (Item) leafTuple, leafTuple->size,
+ (Item) leafTuple, leafTuple->size,
NULL, false);
xlrec.offnumLeaf = current->offnum;
else
{
/*
- * Tuple must be inserted into existing chain. We mustn't change
- * the chain's head address, but we don't need to chase the entire
- * chain to put the tuple at the end; we can insert it second.
+ * Tuple must be inserted into existing chain. We mustn't change the
+ * chain's head address, but we don't need to chase the entire chain
+ * to put the tuple at the end; we can insert it second.
*
* Also, it's possible that the "chain" consists only of a DEAD tuple,
* in which case we should replace the DEAD tuple in-place.
OffsetNumber offnum;
head = (SpGistLeafTuple) PageGetItem(current->page,
- PageGetItemId(current->page, current->offnum));
+ PageGetItemId(current->page, current->offnum));
if (head->tupstate == SPGIST_LIVE)
{
leafTuple->nextOffset = head->nextOffset;
* and set new second element
*/
head = (SpGistLeafTuple) PageGetItem(current->page,
- PageGetItemId(current->page, current->offnum));
+ PageGetItemId(current->page, current->offnum));
head->nextOffset = offnum;
xlrec.offnumLeaf = offnum;
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
PageSetLSN(current->page, recptr);
- PageSetTLI(current->page, ThisTimeLineID);
/* update parent only if we actually changed it */
if (xlrec.blknoParent != InvalidBlockNumber)
{
PageSetLSN(parent->page, recptr);
- PageSetTLI(parent->page, ThisTimeLineID);
}
}
n = 0,
totalSize = 0;
- if (current->blkno == SPGIST_HEAD_BLKNO)
+ if (SpGistBlockIsRoot(current->blkno))
{
/* return impossible values to force split */
*nToSplit = BLCKSZ;
static void
moveLeafs(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
- SpGistLeafTuple newLeafTuple)
+ SpGistLeafTuple newLeafTuple, bool isNulls)
{
int i,
nDelete,
}
/* Find a leaf page that will hold them */
- nbuf = SpGistGetBuffer(index, GBUF_LEAF, size, &xlrec.newPage);
+ nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
+ size, &xlrec.newPage);
npage = BufferGetPage(nbuf);
nblkno = BufferGetBlockNumber(nbuf);
Assert(nblkno != current->blkno);
xlrec.blknoDst = nblkno;
xlrec.nMoves = nDelete;
xlrec.replaceDead = replaceDead;
+ xlrec.storesNulls = isNulls;
xlrec.blknoParent = parent->blkno;
xlrec.offnumParent = parent->offnum;
for (i = 0; i < nDelete; i++)
{
it = (SpGistLeafTuple) PageGetItem(current->page,
- PageGetItemId(current->page, toDelete[i]));
+ PageGetItemId(current->page, toDelete[i]));
Assert(it->tupstate == SPGIST_LIVE);
/*
leafptr += newLeafTuple->size;
/*
- * Now delete the old tuples, leaving a redirection pointer behind for
- * the first one, unless we're doing an index build; in which case there
- * can't be any concurrent scan so we need not provide a redirect.
+ * Now delete the old tuples, leaving a redirection pointer behind for the
+ * first one, unless we're doing an index build; in which case there can't
+ * be any concurrent scan so we need not provide a redirect.
*/
spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
- state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
+ state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
SPGIST_PLACEHOLDER,
nblkno, r);
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
PageSetLSN(current->page, recptr);
- PageSetTLI(current->page, ThisTimeLineID);
PageSetLSN(npage, recptr);
- PageSetTLI(npage, ThisTimeLineID);
PageSetLSN(parent->page, recptr);
- PageSetTLI(parent->page, ThisTimeLineID);
}
END_CRIT_SECTION();
SpGistDeadTuple dt;
dt = (SpGistDeadTuple) PageGetItem(current->page,
- PageGetItemId(current->page, position));
+ PageGetItemId(current->page, position));
Assert(dt->tupstate == SPGIST_REDIRECT);
Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
ItemPointerSet(&dt->pointer, blkno, offnum);
* If so, randomly divide the tuples into several nodes (all with the same
* label) and return TRUE to select allTheSame mode for this inner tuple.
*
+ * (This code is also used to forcibly select allTheSame mode for nulls.)
+ *
* If we know that the leaf tuples wouldn't all fit on one page, then we
* exclude the last tuple (which is the incoming new tuple that forced a split)
* from the check to see if more than one node is used. The reason for this
/* The opclass may not use node labels, but if it does, duplicate 'em */
if (out->nodeLabels)
{
- Datum theLabel = out->nodeLabels[theNode];
+ Datum theLabel = out->nodeLabels[theNode];
out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
for (i = 0; i < out->nNodes; i++)
static bool
doPickSplit(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
- SpGistLeafTuple newLeafTuple, int level, bool isNew)
+ SpGistLeafTuple newLeafTuple,
+ int level, bool isNulls, bool isNew)
{
bool insertedNew = false;
spgPickSplitIn in;
* also, count up the amount of space that will be freed from current.
* (Note that in the non-root case, we won't actually delete the old
* tuples, only replace them with redirects or placeholders.)
+ *
+ * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
+ * page. For a pass-by-value data type we will fetch a word that must
+ * exist even though it may contain garbage (because of the fact that leaf
+ * tuples must have size at least SGDTSIZE). For a pass-by-reference type
+ * we are just computing a pointer that isn't going to get dereferenced.
+ * So it's not worth guarding the calls with isNulls checks.
*/
nToInsert = 0;
nToDelete = 0;
spaceToDelete = 0;
- if (current->blkno == SPGIST_HEAD_BLKNO)
+ if (SpGistBlockIsRoot(current->blkno))
{
/*
* We are splitting the root (which up to now is also a leaf page).
- * Its tuples are not linked, so scan sequentially to get them all.
- * We ignore the original value of current->offnum.
+ * Its tuples are not linked, so scan sequentially to get them all. We
+ * ignore the original value of current->offnum.
*/
for (i = FirstOffsetNumber; i <= max; i++)
{
/* we will delete the tuple altogether, so count full space */
spaceToDelete += it->size + sizeof(ItemIdData);
}
- else /* tuples on root should be live */
+ else /* tuples on root should be live */
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
}
}
/*
* We may not actually insert new tuple because another picksplit may be
- * necessary due to too large value, but we will try to to allocate enough
+ * necessary due to too large value, but we will try to allocate enough
* space to include it; and in any case it has to be included in the input
- * for the picksplit function. So don't increment nToInsert yet.
+ * for the picksplit function. So don't increment nToInsert yet.
*/
in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
in.nTuples++;
- /*
- * Perform split using user-defined method.
- */
memset(&out, 0, sizeof(out));
- procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
- FunctionCall2Coll(procinfo,
- index->rd_indcollation[0],
- PointerGetDatum(&in),
- PointerGetDatum(&out));
+ if (!isNulls)
+ {
+ /*
+ * Perform split using user-defined method.
+ */
+ procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
+ FunctionCall2Coll(procinfo,
+ index->rd_indcollation[0],
+ PointerGetDatum(&in),
+ PointerGetDatum(&out));
- /*
- * Form new leaf tuples and count up the total space needed.
- */
- totalLeafSizes = 0;
- for (i = 0; i < in.nTuples; i++)
+ /*
+ * Form new leaf tuples and count up the total space needed.
+ */
+ totalLeafSizes = 0;
+ for (i = 0; i < in.nTuples; i++)
+ {
+ newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
+ out.leafTupleDatums[i],
+ false);
+ totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+ }
+ }
+ else
{
- newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
- out.leafTupleDatums[i]);
- totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+ /*
+ * Perform dummy split that puts all tuples into one node.
+ * checkAllTheSame will override this and force allTheSame mode.
+ */
+ out.hasPrefix = false;
+ out.nNodes = 1;
+ out.nodeLabels = NULL;
+ out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
+
+ /*
+ * Form new leaf tuples and count up the total space needed.
+ */
+ totalLeafSizes = 0;
+ for (i = 0; i < in.nTuples; i++)
+ {
+ newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
+ (Datum) 0,
+ true);
+ totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+ }
}
/*
* Check to see if the picksplit function failed to separate the values,
* ie, it put them all into the same child node. If so, select allTheSame
- * mode and create a random split instead. See comments for
+ * mode and create a random split instead. See comments for
* checkAllTheSame as to why we need to know if the new leaf tuples could
* fit on one page.
*/
for (i = 0; i < out.nNodes; i++)
{
Datum label = (Datum) 0;
- bool isnull = (out.nodeLabels == NULL);
+ bool labelisnull = (out.nodeLabels == NULL);
- if (!isnull)
+ if (!labelisnull)
label = out.nodeLabels[i];
- nodes[i] = spgFormNodeTuple(state, label, isnull);
+ nodes[i] = spgFormNodeTuple(state, label, labelisnull);
}
innerTuple = spgFormInnerTuple(state,
out.hasPrefix, out.prefixDatum,
innerTuple->allTheSame = allTheSame;
/*
- * Update nodes[] array to point into the newly formed innerTuple, so
- * that we can adjust their downlinks below.
+ * Update nodes[] array to point into the newly formed innerTuple, so that
+ * we can adjust their downlinks below.
*/
SGITITERATE(innerTuple, i, node)
{
}
/*
- * To perform the split, we must insert a new inner tuple, which can't
- * go on a leaf page; and unless we are splitting the root page, we
- * must then update the parent tuple's downlink to point to the inner
- * tuple. If there is room, we'll put the new inner tuple on the same
- * page as the parent tuple, otherwise we need another non-leaf buffer.
- * But if the parent page is the root, we can't add the new inner tuple
- * there, because the root page must have only one inner tuple.
+ * To perform the split, we must insert a new inner tuple, which can't go
+ * on a leaf page; and unless we are splitting the root page, we must then
+ * update the parent tuple's downlink to point to the inner tuple. If
+ * there is room, we'll put the new inner tuple on the same page as the
+ * parent tuple, otherwise we need another non-leaf buffer. But if the
+ * parent page is the root, we can't add the new inner tuple there,
+ * because the root page must have only one inner tuple.
*/
xlrec.initInner = false;
if (parent->buffer != InvalidBuffer &&
- parent->blkno != SPGIST_HEAD_BLKNO &&
+ !SpGistBlockIsRoot(parent->blkno) &&
(SpGistPageGetFreeSpace(parent->page, 1) >=
innerTuple->size + sizeof(ItemIdData)))
{
{
/* Send tuple to page with next triple parity (see README) */
newInnerBuffer = SpGistGetBuffer(index,
- GBUF_INNER_PARITY(parent->blkno + 1),
- innerTuple->size + sizeof(ItemIdData),
+ GBUF_INNER_PARITY(parent->blkno + 1) |
+ (isNulls ? GBUF_NULLS : 0),
+ innerTuple->size + sizeof(ItemIdData),
&xlrec.initInner);
}
else
newInnerBuffer = InvalidBuffer;
}
- /*----------
- * Because a WAL record can't involve more than four buffers, we can
- * only afford to deal with two leaf pages in each picksplit action,
- * ie the current page and at most one other.
+ /*
+ * Because a WAL record can't involve more than four buffers, we can only
+ * afford to deal with two leaf pages in each picksplit action, ie the
+ * current page and at most one other.
*
- * The new leaf tuples converted from the existing ones should require
- * the same or less space, and therefore should all fit onto one page
+ * The new leaf tuples converted from the existing ones should require the
+ * same or less space, and therefore should all fit onto one page
* (although that's not necessarily the current page, since we can't
* delete the old tuples but only replace them with placeholders).
- * However, the incoming new tuple might not also fit, in which case
- * we might need another picksplit cycle to reduce it some more.
+ * However, the incoming new tuple might not also fit, in which case we
+ * might need another picksplit cycle to reduce it some more.
*
- * If there's not room to put everything back onto the current page,
- * then we decide on a per-node basis which tuples go to the new page.
- * (We do it like that because leaf tuple chains can't cross pages,
- * so we must place all leaf tuples belonging to the same parent node
- * on the same page.)
+ * If there's not room to put everything back onto the current page, then
+ * we decide on a per-node basis which tuples go to the new page. (We do
+ * it like that because leaf tuple chains can't cross pages, so we must
+ * place all leaf tuples belonging to the same parent node on the same
+ * page.)
*
* If we are splitting the root page (turning it from a leaf page into an
* inner page), then no leaf tuples can go back to the current page; they
* must all go somewhere else.
- *----------
*/
- if (current->blkno != SPGIST_HEAD_BLKNO)
+ if (!SpGistBlockIsRoot(current->blkno))
currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
else
currentFreeSpace = 0; /* prevent assigning any tuples to current */
int curspace;
int newspace;
- newLeafBuffer = SpGistGetBuffer(index, GBUF_LEAF,
+ newLeafBuffer = SpGistGetBuffer(index,
+ GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
Min(totalLeafSizes,
SPGIST_PAGE_CAPACITY),
&xlrec.initDest);
+
/*
- * Attempt to assign node groups to the two pages. We might fail to
+ * Attempt to assign node groups to the two pages. We might fail to
* do so, even if totalLeafSizes is less than the available space,
* because we can't split a group across pages.
*/
{
if (leafSizes[i] <= curspace)
{
- nodePageSelect[i] = 0; /* signifies current page */
+ nodePageSelect[i] = 0; /* signifies current page */
curspace -= leafSizes[i];
}
else
{
- nodePageSelect[i] = 1; /* signifies new leaf page */
+ nodePageSelect[i] = 1; /* signifies new leaf page */
newspace -= leafSizes[i];
}
}
else if (includeNew)
{
/* We must exclude the new leaf tuple from the split */
- int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
+ int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
leafSizes[nodeOfNewTuple] -=
newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
{
if (leafSizes[i] <= curspace)
{
- nodePageSelect[i] = 0; /* signifies current page */
+ nodePageSelect[i] = 0; /* signifies current page */
curspace -= leafSizes[i];
}
else
{
- nodePageSelect[i] = 1; /* signifies new leaf page */
+ nodePageSelect[i] = 1; /* signifies new leaf page */
newspace -= leafSizes[i];
}
}
xlrec.blknoDest = InvalidBlockNumber;
xlrec.nDelete = 0;
xlrec.initSrc = isNew;
+ xlrec.storesNulls = isNulls;
leafdata = leafptr = (char *) palloc(totalLeafSizes);
* the root; in that case there's no need because we'll re-init the page
* below. We do this first to make room for reinserting new leaf tuples.
*/
- if (current->blkno != SPGIST_HEAD_BLKNO)
+ if (!SpGistBlockIsRoot(current->blkno))
{
/*
* Init buffer instead of deleting individual tuples, but only if
nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
PageGetMaxOffsetNumber(current->page))
{
- SpGistInitBuffer(current->buffer, SPGIST_LEAF);
+ SpGistInitBuffer(current->buffer,
+ SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
xlrec.initSrc = true;
}
else if (isNew)
for (i = 0; i < nToInsert; i++)
{
SpGistLeafTuple it = newLeafs[i];
- Buffer leafBuffer;
+ Buffer leafBuffer;
BlockNumber leafBlock;
OffsetNumber newoffset;
* Splitting root page, which was a leaf but now becomes inner page
* (and so "current" continues to point at it)
*/
- Assert(current->blkno == SPGIST_HEAD_BLKNO);
+ Assert(SpGistBlockIsRoot(current->blkno));
Assert(redirectTuplePos == InvalidOffsetNumber);
- SpGistInitBuffer(current->buffer, 0);
+ SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
xlrec.initInner = true;
xlrec.blknoInner = current->blkno;
Page page = BufferGetPage(newLeafBuffer);
PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
}
if (saveCurrent.buffer != InvalidBuffer)
Page page = BufferGetPage(saveCurrent.buffer);
PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
}
PageSetLSN(current->page, recptr);
- PageSetTLI(current->page, ThisTimeLineID);
if (parent->buffer != InvalidBuffer)
{
PageSetLSN(parent->page, recptr);
- PageSetTLI(parent->page, ThisTimeLineID);
}
}
XLogRecData rdata[5];
spgxlogAddNode xlrec;
+ /* Should not be applied to nulls */
+ Assert(!SpGistPageStoresNulls(current->page));
+
/* Construct new inner tuple with additional node */
newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
PageSetLSN(current->page, recptr);
- PageSetTLI(current->page, ThisTimeLineID);
}
END_CRIT_SECTION();
* allow only one inner tuple on the root page, and spgFormInnerTuple
* always checks that inner tuples don't exceed the size of a page.
*/
- if (current->blkno == SPGIST_HEAD_BLKNO)
+ if (SpGistBlockIsRoot(current->blkno))
elog(ERROR, "cannot enlarge root tuple any more");
Assert(parent->buffer != InvalidBuffer);
xlrec.nodeI = parent->node;
/*
- * obtain new buffer with the same parity as current, since it will
- * be a child of same parent tuple
+ * obtain new buffer with the same parity as current, since it will be
+ * a child of same parent tuple
*/
current->buffer = SpGistGetBuffer(index,
GBUF_INNER_PARITY(current->blkno),
- newInnerTuple->size + sizeof(ItemIdData),
+ newInnerTuple->size + sizeof(ItemIdData),
&xlrec.newPage);
current->blkno = BufferGetBlockNumber(current->buffer);
current->page = BufferGetPage(current->buffer);
xlrec.blknoNew = current->blkno;
/*
- * Let's just make real sure new current isn't same as old. Right
- * now that's impossible, but if SpGistGetBuffer ever got smart enough
- * to delete placeholder tuples before checking space, maybe it
- * wouldn't be impossible. The case would appear to work except that
- * WAL replay would be subtly wrong, so I think a mere assert isn't
- * enough here.
+ * Let's just make real sure new current isn't same as old. Right now
+ * that's impossible, but if SpGistGetBuffer ever got smart enough to
+ * delete placeholder tuples before checking space, maybe it wouldn't
+ * be impossible. The case would appear to work except that WAL
+ * replay would be subtly wrong, so I think a mere assert isn't enough
+ * here.
*/
- if (xlrec.blknoNew == xlrec.blkno)
- elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
+ if (xlrec.blknoNew == xlrec.blkno)
+ elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
/*
* New current and parent buffer will both be modified; but note that
/* we don't bother to check if any of these are redundant */
PageSetLSN(current->page, recptr);
- PageSetTLI(current->page, ThisTimeLineID);
PageSetLSN(parent->page, recptr);
- PageSetTLI(parent->page, ThisTimeLineID);
PageSetLSN(saveCurrent.page, recptr);
- PageSetTLI(saveCurrent.page, ThisTimeLineID);
}
END_CRIT_SECTION();
spgxlogSplitTuple xlrec;
Buffer newBuffer = InvalidBuffer;
+ /* Should not be applied to nulls */
+ Assert(!SpGistPageStoresNulls(current->page));
+
/*
- * Construct new prefix tuple, containing a single node with the
- * specified label. (We'll update the node's downlink to point to the
- * new postfix tuple, below.)
+ * Construct new prefix tuple, containing a single node with the specified
+ * label. (We'll update the node's downlink to point to the new postfix
+ * tuple, below.)
*/
node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false);
* For the space calculation, note that prefixTuple replaces innerTuple
* but postfixTuple will be a new entry.
*/
- if (current->blkno == SPGIST_HEAD_BLKNO ||
+ if (SpGistBlockIsRoot(current->blkno) ||
SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
{
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
PageSetLSN(current->page, recptr);
- PageSetTLI(current->page, ThisTimeLineID);
if (newBuffer != InvalidBuffer)
{
PageSetLSN(BufferGetPage(newBuffer), recptr);
- PageSetTLI(BufferGetPage(newBuffer), ThisTimeLineID);
}
}
}
/*
- * Insert one item into the index
+ * Insert one item into the index.
+ *
+ * Returns true on success, false if we failed to complete the insertion
+ * because of conflict with a concurrent insert. In the latter case,
+ * caller should re-call spgdoinsert() with the same args.
*/
-void
+bool
spgdoinsert(Relation index, SpGistState *state,
- ItemPointer heapPtr, Datum datum)
+ ItemPointer heapPtr, Datum datum, bool isnull)
{
int level = 0;
Datum leafDatum;
int leafSize;
SPPageDesc current,
parent;
+ FmgrInfo *procinfo = NULL;
+
+ /*
+ * Look up FmgrInfo of the user-defined choose function once, to save
+ * cycles in the loop below.
+ */
+ if (!isnull)
+ procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
/*
* Since we don't use index_form_tuple in this AM, we have to make sure
* value to be inserted is not toasted; FormIndexDatum doesn't guarantee
* that.
*/
- if (state->attType.attlen == -1)
+ if (!isnull && state->attType.attlen == -1)
datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
leafDatum = datum;
* If it isn't gonna fit, and the opclass can't reduce the datum size by
* suffixing, bail out now rather than getting into an endless loop.
*/
- leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
- SpGistGetTypeSize(&state->attType, leafDatum);
+ if (!isnull)
+ leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
+ SpGistGetTypeSize(&state->attType, leafDatum);
+ else
+ leafSize = SGDTSIZE + sizeof(ItemIdData);
if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
(unsigned long) (leafSize - sizeof(ItemIdData)),
- (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
+ (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
RelationGetRelationName(index)),
- errhint("Values larger than a buffer page cannot be indexed.")));
+ errhint("Values larger than a buffer page cannot be indexed.")));
- /* Initialize "current" to the root page */
- current.blkno = SPGIST_HEAD_BLKNO;
+ /* Initialize "current" to the appropriate root page */
+ current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
current.buffer = InvalidBuffer;
current.page = NULL;
current.offnum = FirstOffsetNumber;
if (current.blkno == InvalidBlockNumber)
{
/*
- * Create a leaf page. If leafSize is too large to fit on a page,
+ * Create a leaf page. If leafSize is too large to fit on a page,
* we won't actually use the page yet, but it simplifies the API
* for doPickSplit to always have a leaf page at hand; so just
* quietly limit our request to a page size.
*/
- current.buffer = SpGistGetBuffer(index, GBUF_LEAF,
- Min(leafSize,
- SPGIST_PAGE_CAPACITY),
- &isNew);
+ current.buffer =
+ SpGistGetBuffer(index,
+ GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
+ Min(leafSize, SPGIST_PAGE_CAPACITY),
+ &isNew);
current.blkno = BufferGetBlockNumber(current.buffer);
}
- else if (parent.buffer == InvalidBuffer ||
- current.blkno != parent.blkno)
+ else if (parent.buffer == InvalidBuffer)
{
+ /* we hold no parent-page lock, so no deadlock is possible */
current.buffer = ReadBuffer(index, current.blkno);
LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
}
+ else if (current.blkno != parent.blkno)
+ {
+ /* descend to a new child page */
+ current.buffer = ReadBuffer(index, current.blkno);
+
+ /*
+ * Attempt to acquire lock on child page. We must beware of
+ * deadlock against another insertion process descending from that
+ * page to our parent page (see README). If we fail to get lock,
+ * abandon the insertion and tell our caller to start over. XXX
+ * this could be improved; perhaps it'd be worth sleeping a bit
+ * before giving up?
+ */
+ if (!ConditionalLockBuffer(current.buffer))
+ {
+ ReleaseBuffer(current.buffer);
+ UnlockReleaseBuffer(parent.buffer);
+ return false;
+ }
+ }
else
{
/* inner tuple can be stored on the same page as parent one */
}
current.page = BufferGetPage(current.buffer);
+ /* should not arrive at a page of the wrong type */
+ if (isnull ? !SpGistPageStoresNulls(current.page) :
+ SpGistPageStoresNulls(current.page))
+ elog(ERROR, "SPGiST index page %u has wrong nulls flag",
+ current.blkno);
+
if (SpGistPageIsLeaf(current.page))
{
SpGistLeafTuple leafTuple;
int nToSplit,
sizeToSplit;
- leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum);
+ leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
if (leafTuple->size + sizeof(ItemIdData) <=
SpGistPageGetFreeSpace(current.page, 1))
{
/* it fits on page, so insert it and we're done */
addLeafTuple(index, state, leafTuple,
- ¤t, &parent, isNew);
+ ¤t, &parent, isnull, isNew);
break;
}
else if ((sizeToSplit =
checkSplitConditions(index, state, ¤t,
- &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
+ &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
nToSplit < 64 &&
leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
{
* chain to another leaf page rather than splitting it.
*/
Assert(!isNew);
- moveLeafs(index, state, ¤t, &parent, leafTuple);
+ moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
break; /* we're done */
}
else
{
/* picksplit */
if (doPickSplit(index, state, ¤t, &parent,
- leafTuple, level, isNew))
+ leafTuple, level, isnull, isNew))
break; /* doPickSplit installed new tuples */
/* leaf tuple will not be inserted yet */
SpGistInnerTuple innerTuple;
spgChooseIn in;
spgChooseOut out;
- FmgrInfo *procinfo;
/*
* spgAddNode and spgSplitTuple cases will loop back to here to
memset(&out, 0, sizeof(out));
- procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
- FunctionCall2Coll(procinfo,
- index->rd_indcollation[0],
- PointerGetDatum(&in),
- PointerGetDatum(&out));
+ if (!isnull)
+ {
+ /* use user-defined choose method */
+ FunctionCall2Coll(procinfo,
+ index->rd_indcollation[0],
+ PointerGetDatum(&in),
+ PointerGetDatum(&out));
+ }
+ else
+ {
+ /* force "match" action (to insert to random subnode) */
+ out.resultType = spgMatchNode;
+ }
if (innerTuple->allTheSame)
{
/* Adjust level as per opclass request */
level += out.result.matchNode.levelAdd;
/* Replace leafDatum and recompute leafSize */
- leafDatum = out.result.matchNode.restDatum;
- leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
- SpGistGetTypeSize(&state->attType, leafDatum);
+ if (!isnull)
+ {
+ leafDatum = out.result.matchNode.restDatum;
+ leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
+ SpGistGetTypeSize(&state->attType, leafDatum);
+ }
/*
- * Loop around and attempt to insert the new leafDatum
- * at "current" (which might reference an existing child
+ * Loop around and attempt to insert the new leafDatum at
+ * "current" (which might reference an existing child
* tuple, or might be invalid to force us to find a new
* page for the tuple).
*
out.result.addNode.nodeLabel);
/*
- * Retry insertion into the enlarged node. We assume
- * that we'll get a MatchNode result this time.
+ * Retry insertion into the enlarged node. We assume that
+ * we'll get a MatchNode result this time.
*/
goto process_inner_tuple;
break;
SpGistSetLastUsedPage(index, parent.buffer);
UnlockReleaseBuffer(parent.buffer);
}
+
+ return true;
}