1 /*-------------------------------------------------------------------------
4 * implementation of insert algorithm
7 * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/spgist/spgdoinsert.c
13 *-------------------------------------------------------------------------
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "miscadmin.h"
21 #include "storage/bufmgr.h"
25 * SPPageDesc tracks all info about a page we are inserting into. In some
26 * situations it actually identifies a tuple, or even a specific node within
27 * an inner tuple. But any of the fields can be invalid. If the buffer
28 * field is valid, it implies we hold pin and exclusive lock on that buffer.
29 * page pointer should be valid exactly when buffer is.
31 typedef struct SPPageDesc
33 BlockNumber blkno; /* block number, or InvalidBlockNumber */
34 Buffer buffer; /* page's buffer number, or InvalidBuffer */
35 Page page; /* pointer to page buffer, or NULL */
36 OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
37 int node; /* node number within inner tuple, or -1 */
42 * Set the item pointer in the nodeN'th entry in inner tuple tup. This
43 * is used to update the parent inner tuple's downlink after a move or
47 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
48 BlockNumber blkno, OffsetNumber offset)
53 SGITITERATE(tup, i, node)
57 ItemPointerSet(&node->t_tid, blkno, offset);
62 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
67 * Form a new inner tuple containing one more node than the given one, with
68 * the specified label datum, inserted at offset "offset" in the node array.
69 * The new tuple's prefix is the same as the old one's.
71 * Note that the new node initially has an invalid downlink. We'll find a
72 * page to point it to later.
74 static SpGistInnerTuple
75 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
81 /* if offset is negative, insert at end */
83 offset = tuple->nNodes;
84 else if (offset > tuple->nNodes)
85 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
87 nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
88 SGITITERATE(tuple, i, node)
96 nodes[offset] = spgFormNodeTuple(state, label, false);
98 return spgFormInnerTuple(state,
99 (tuple->prefixSize > 0),
100 SGITDATUM(tuple, state),
105 /* qsort comparator for sorting OffsetNumbers */
107 cmpOffsetNumbers(const void *a, const void *b)
109 if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
111 return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
115 * Delete multiple tuples from an index page, preserving tuple offset numbers.
117 * The first tuple in the given list is replaced with a dead tuple of type
118 * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
119 * with dead tuples of type "reststate". If either firststate or reststate
120 * is REDIRECT, blkno/offnum specify where to link to.
122 * NB: this is used during WAL replay, so beware of trying to make it too
123 * smart. In particular, it shouldn't use "state" except for calling
124 * spgFormDeadTuple().
127 spgPageIndexMultiDelete(SpGistState *state, Page page,
128 OffsetNumber *itemnos, int nitems,
129 int firststate, int reststate,
130 BlockNumber blkno, OffsetNumber offnum)
132 OffsetNumber firstItem;
133 OffsetNumber *sortednos;
134 SpGistDeadTuple tuple = NULL;
138 return; /* nothing to do */
141 * For efficiency we want to use PageIndexMultiDelete, which requires the
142 * targets to be listed in sorted order, so we have to sort the itemnos
143 * array. (This also greatly simplifies the math for reinserting the
144 * replacement tuples.) However, we must not scribble on the caller's
145 * array, so we have to make a copy.
147 sortednos = (OffsetNumber *) palloc(sizeof(OffsetNumber) * nitems);
148 memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
150 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
152 PageIndexMultiDelete(page, sortednos, nitems);
154 firstItem = itemnos[0];
156 for (i = 0; i < nitems; i++)
158 OffsetNumber itemno = sortednos[i];
161 tupstate = (itemno == firstItem) ? firststate : reststate;
162 if (tuple == NULL || tuple->tupstate != tupstate)
163 tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
165 if (PageAddItem(page, (Item) tuple, tuple->size,
166 itemno, false, false) != itemno)
167 elog(ERROR, "failed to add item of size %u to SPGiST index page",
170 if (tupstate == SPGIST_REDIRECT)
171 SpGistPageGetOpaque(page)->nRedirection++;
172 else if (tupstate == SPGIST_PLACEHOLDER)
173 SpGistPageGetOpaque(page)->nPlaceholder++;
180 * Update the parent inner tuple's downlink, and mark the parent buffer
181 * dirty (this must be the last change to the parent page in the current
185 saveNodeLink(Relation index, SPPageDesc *parent,
186 BlockNumber blkno, OffsetNumber offnum)
188 SpGistInnerTuple innerTuple;
190 innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
191 PageGetItemId(parent->page, parent->offnum));
193 spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195 MarkBufferDirty(parent->buffer);
199 * Add a leaf tuple to a leaf page where there is known to be room for it
202 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
203 SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205 XLogRecData rdata[4];
206 spgxlogAddLeaf xlrec;
208 xlrec.node = index->rd_node;
209 xlrec.blknoLeaf = current->blkno;
210 xlrec.newPage = isNew;
211 xlrec.storesNulls = isNulls;
213 /* these will be filled below as needed */
214 xlrec.offnumLeaf = InvalidOffsetNumber;
215 xlrec.offnumHeadLeaf = InvalidOffsetNumber;
216 xlrec.blknoParent = InvalidBlockNumber;
217 xlrec.offnumParent = InvalidOffsetNumber;
220 ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
221 /* we assume sizeof(xlrec) is at least int-aligned */
222 ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1);
223 ACCEPT_RDATA_BUFFER(current->buffer, 2);
225 START_CRIT_SECTION();
227 if (current->offnum == InvalidOffsetNumber ||
228 SpGistBlockIsRoot(current->blkno))
230 /* Tuple is not part of a chain */
231 leafTuple->nextOffset = InvalidOffsetNumber;
232 current->offnum = SpGistPageAddNewItem(state, current->page,
233 (Item) leafTuple, leafTuple->size,
236 xlrec.offnumLeaf = current->offnum;
238 /* Must update parent's downlink if any */
239 if (parent->buffer != InvalidBuffer)
241 xlrec.blknoParent = parent->blkno;
242 xlrec.offnumParent = parent->offnum;
243 xlrec.nodeI = parent->node;
245 saveNodeLink(index, parent, current->blkno, current->offnum);
247 ACCEPT_RDATA_BUFFER(parent->buffer, 3);
253 * Tuple must be inserted into existing chain. We mustn't change
254 * the chain's head address, but we don't need to chase the entire
255 * chain to put the tuple at the end; we can insert it second.
257 * Also, it's possible that the "chain" consists only of a DEAD tuple,
258 * in which case we should replace the DEAD tuple in-place.
260 SpGistLeafTuple head;
263 head = (SpGistLeafTuple) PageGetItem(current->page,
264 PageGetItemId(current->page, current->offnum));
265 if (head->tupstate == SPGIST_LIVE)
267 leafTuple->nextOffset = head->nextOffset;
268 offnum = SpGistPageAddNewItem(state, current->page,
269 (Item) leafTuple, leafTuple->size,
273 * re-get head of list because it could have been moved on page,
274 * and set new second element
276 head = (SpGistLeafTuple) PageGetItem(current->page,
277 PageGetItemId(current->page, current->offnum));
278 head->nextOffset = offnum;
280 xlrec.offnumLeaf = offnum;
281 xlrec.offnumHeadLeaf = current->offnum;
283 else if (head->tupstate == SPGIST_DEAD)
285 leafTuple->nextOffset = InvalidOffsetNumber;
286 PageIndexTupleDelete(current->page, current->offnum);
287 if (PageAddItem(current->page,
288 (Item) leafTuple, leafTuple->size,
289 current->offnum, false, false) != current->offnum)
290 elog(ERROR, "failed to add item of size %u to SPGiST index page",
293 /* WAL replay distinguishes this case by equal offnums */
294 xlrec.offnumLeaf = current->offnum;
295 xlrec.offnumHeadLeaf = current->offnum;
298 elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
301 MarkBufferDirty(current->buffer);
303 if (RelationNeedsWAL(index))
307 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
309 PageSetLSN(current->page, recptr);
310 PageSetTLI(current->page, ThisTimeLineID);
312 /* update parent only if we actually changed it */
313 if (xlrec.blknoParent != InvalidBlockNumber)
315 PageSetLSN(parent->page, recptr);
316 PageSetTLI(parent->page, ThisTimeLineID);
324 * Count the number and total size of leaf tuples in the chain starting at
325 * current->offnum. Return number into *nToSplit and total size as function
328 * Klugy special case when considering the root page (i.e., root is a leaf
329 * page, but we're about to split for the first time): return fake large
330 * values to force spgdoinsert() to take the doPickSplit rather than
331 * moveLeafs code path. moveLeafs is not prepared to deal with root page.
334 checkSplitConditions(Relation index, SpGistState *state,
335 SPPageDesc *current, int *nToSplit)
341 if (SpGistBlockIsRoot(current->blkno))
343 /* return impossible values to force split */
349 while (i != InvalidOffsetNumber)
353 Assert(i >= FirstOffsetNumber &&
354 i <= PageGetMaxOffsetNumber(current->page));
355 it = (SpGistLeafTuple) PageGetItem(current->page,
356 PageGetItemId(current->page, i));
357 if (it->tupstate == SPGIST_LIVE)
360 totalSize += it->size + sizeof(ItemIdData);
362 else if (it->tupstate == SPGIST_DEAD)
364 /* We could see a DEAD tuple as first/only chain item */
365 Assert(i == current->offnum);
366 Assert(it->nextOffset == InvalidOffsetNumber);
367 /* Don't count it in result, because it won't go to other page */
370 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
381 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
382 * but the chain has to be moved because there's not enough room to add
383 * newLeafTuple to its page. We use this method when the chain contains
384 * very little data so a split would be inefficient. We are sure we can
385 * fit the chain plus newLeafTuple on one other page.
388 moveLeafs(Relation index, SpGistState *state,
389 SPPageDesc *current, SPPageDesc *parent,
390 SpGistLeafTuple newLeafTuple, bool isNulls)
399 OffsetNumber r = InvalidOffsetNumber,
400 startOffset = InvalidOffsetNumber;
401 bool replaceDead = false;
402 OffsetNumber *toDelete;
403 OffsetNumber *toInsert;
405 XLogRecData rdata[7];
406 spgxlogMoveLeafs xlrec;
410 /* This doesn't work on root page */
411 Assert(parent->buffer != InvalidBuffer);
412 Assert(parent->buffer != current->buffer);
414 /* Locate the tuples to be moved, and count up the space needed */
415 i = PageGetMaxOffsetNumber(current->page);
416 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
417 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
419 size = newLeafTuple->size + sizeof(ItemIdData);
423 while (i != InvalidOffsetNumber)
427 Assert(i >= FirstOffsetNumber &&
428 i <= PageGetMaxOffsetNumber(current->page));
429 it = (SpGistLeafTuple) PageGetItem(current->page,
430 PageGetItemId(current->page, i));
432 if (it->tupstate == SPGIST_LIVE)
434 toDelete[nDelete] = i;
435 size += it->size + sizeof(ItemIdData);
438 else if (it->tupstate == SPGIST_DEAD)
440 /* We could see a DEAD tuple as first/only chain item */
441 Assert(i == current->offnum);
442 Assert(it->nextOffset == InvalidOffsetNumber);
443 /* We don't want to move it, so don't count it in size */
444 toDelete[nDelete] = i;
449 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
454 /* Find a leaf page that will hold them */
455 nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
456 size, &xlrec.newPage);
457 npage = BufferGetPage(nbuf);
458 nblkno = BufferGetBlockNumber(nbuf);
459 Assert(nblkno != current->blkno);
461 /* prepare WAL info */
462 xlrec.node = index->rd_node;
463 STORE_STATE(state, xlrec.stateSrc);
465 xlrec.blknoSrc = current->blkno;
466 xlrec.blknoDst = nblkno;
467 xlrec.nMoves = nDelete;
468 xlrec.replaceDead = replaceDead;
469 xlrec.storesNulls = isNulls;
471 xlrec.blknoParent = parent->blkno;
472 xlrec.offnumParent = parent->offnum;
473 xlrec.nodeI = parent->node;
475 leafdata = leafptr = palloc(size);
477 START_CRIT_SECTION();
479 /* copy all the old tuples to new page, unless they're dead */
483 for (i = 0; i < nDelete; i++)
485 it = (SpGistLeafTuple) PageGetItem(current->page,
486 PageGetItemId(current->page, toDelete[i]));
487 Assert(it->tupstate == SPGIST_LIVE);
490 * Update chain link (notice the chain order gets reversed, but we
491 * don't care). We're modifying the tuple on the source page
492 * here, but it's okay since we're about to delete it.
496 r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
497 &startOffset, false);
499 toInsert[nInsert] = r;
502 /* save modified tuple into leafdata as well */
503 memcpy(leafptr, it, it->size);
508 /* add the new tuple as well */
509 newLeafTuple->nextOffset = r;
510 r = SpGistPageAddNewItem(state, npage,
511 (Item) newLeafTuple, newLeafTuple->size,
512 &startOffset, false);
513 toInsert[nInsert] = r;
515 memcpy(leafptr, newLeafTuple, newLeafTuple->size);
516 leafptr += newLeafTuple->size;
519 * Now delete the old tuples, leaving a redirection pointer behind for
520 * the first one, unless we're doing an index build; in which case there
521 * can't be any concurrent scan so we need not provide a redirect.
523 spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
524 state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
528 /* Update parent's downlink and mark parent page dirty */
529 saveNodeLink(index, parent, nblkno, r);
531 /* Mark the leaf pages too */
532 MarkBufferDirty(current->buffer);
533 MarkBufferDirty(nbuf);
535 if (RelationNeedsWAL(index))
539 ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
540 ACCEPT_RDATA_DATA(toDelete, MAXALIGN(sizeof(OffsetNumber) * nDelete), 1);
541 ACCEPT_RDATA_DATA(toInsert, MAXALIGN(sizeof(OffsetNumber) * nInsert), 2);
542 ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3);
543 ACCEPT_RDATA_BUFFER(current->buffer, 4);
544 ACCEPT_RDATA_BUFFER(nbuf, 5);
545 ACCEPT_RDATA_BUFFER(parent->buffer, 6);
547 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
549 PageSetLSN(current->page, recptr);
550 PageSetTLI(current->page, ThisTimeLineID);
551 PageSetLSN(npage, recptr);
552 PageSetTLI(npage, ThisTimeLineID);
553 PageSetLSN(parent->page, recptr);
554 PageSetTLI(parent->page, ThisTimeLineID);
559 /* Update local free-space cache and release new buffer */
560 SpGistSetLastUsedPage(index, nbuf);
561 UnlockReleaseBuffer(nbuf);
565 * Update previously-created redirection tuple with appropriate destination
567 * We use this when it's not convenient to know the destination first.
568 * The tuple should have been made with the "impossible" destination of
572 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
573 BlockNumber blkno, OffsetNumber offnum)
577 dt = (SpGistDeadTuple) PageGetItem(current->page,
578 PageGetItemId(current->page, position));
579 Assert(dt->tupstate == SPGIST_REDIRECT);
580 Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
581 ItemPointerSet(&dt->pointer, blkno, offnum);
585 * Test to see if the user-defined picksplit function failed to do its job,
586 * ie, it put all the leaf tuples into the same node.
587 * If so, randomly divide the tuples into several nodes (all with the same
588 * label) and return TRUE to select allTheSame mode for this inner tuple.
590 * (This code is also used to forcibly select allTheSame mode for nulls.)
592 * If we know that the leaf tuples wouldn't all fit on one page, then we
593 * exclude the last tuple (which is the incoming new tuple that forced a split)
594 * from the check to see if more than one node is used. The reason for this
595 * is that if the existing tuples are put into only one chain, then even if
596 * we move them all to an empty page, there would still not be room for the
597 * new tuple, so we'd get into an infinite loop of picksplit attempts.
598 * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
599 * be split across pages. (Exercise for the reader: figure out why this
600 * fixes the problem even when there is only one old tuple.)
603 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
610 /* For the moment, assume we can include the new leaf tuple */
613 /* If there's only the new leaf tuple, don't select allTheSame mode */
614 if (in->nTuples <= 1)
617 /* If tuple set doesn't fit on one page, ignore the new tuple in test */
618 limit = tooBig ? in->nTuples - 1 : in->nTuples;
620 /* Check to see if more than one node is populated */
621 theNode = out->mapTuplesToNodes[0];
622 for (i = 1; i < limit; i++)
624 if (out->mapTuplesToNodes[i] != theNode)
628 /* Nope, so override the picksplit function's decisions */
630 /* If the new tuple is in its own node, it can't be included in split */
631 if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
634 out->nNodes = 8; /* arbitrary number of child nodes */
636 /* Random assignment of tuples to nodes (note we include new tuple) */
637 for (i = 0; i < in->nTuples; i++)
638 out->mapTuplesToNodes[i] = i % out->nNodes;
640 /* The opclass may not use node labels, but if it does, duplicate 'em */
643 Datum theLabel = out->nodeLabels[theNode];
645 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
646 for (i = 0; i < out->nNodes; i++)
647 out->nodeLabels[i] = theLabel;
650 /* We don't touch the prefix or the leaf tuple datum assignments */
656 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
657 * but the chain has to be split because there's not enough room to add
658 * newLeafTuple to its page.
660 * This function splits the leaf tuple set according to picksplit's rules,
661 * creating one or more new chains that are spread across the current page
662 * and an additional leaf page (we assume that two leaf pages will be
663 * sufficient). A new inner tuple is created, and the parent downlink
664 * pointer is updated to point to that inner tuple instead of the leaf chain.
666 * On exit, current contains the address of the new inner tuple.
668 * Returns true if we successfully inserted newLeafTuple during this function,
669 * false if caller still has to do it (meaning another picksplit operation is
670 * probably needed). Failure could occur if the picksplit result is fairly
671 * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
672 * Because we force the picksplit result to be at least two chains, each
673 * cycle will get rid of at least one leaf tuple from the chain, so the loop
674 * will eventually terminate if lack of balance is the issue. If the tuple
675 * is too big, we assume that repeated picksplit operations will eventually
676 * make it small enough by repeated prefix-stripping. A broken opclass could
677 * make this an infinite loop, though.
680 doPickSplit(Relation index, SpGistState *state,
681 SPPageDesc *current, SPPageDesc *parent,
682 SpGistLeafTuple newLeafTuple,
683 int level, bool isNulls, bool isNew)
685 bool insertedNew = false;
693 SpGistInnerTuple innerTuple;
694 SpGistNodeTuple node,
696 Buffer newInnerBuffer,
698 ItemPointerData *heapPtrs;
699 uint8 *leafPageSelect;
701 OffsetNumber *toDelete;
702 OffsetNumber *toInsert;
703 OffsetNumber redirectTuplePos = InvalidOffsetNumber;
704 OffsetNumber startOffsets[2];
705 SpGistLeafTuple *newLeafs;
707 int currentFreeSpace;
710 XLogRecData rdata[10];
712 spgxlogPickSplit xlrec;
715 SPPageDesc saveCurrent;
723 * Allocate per-leaf-tuple work arrays with max possible size
725 max = PageGetMaxOffsetNumber(current->page);
727 in.datums = (Datum *) palloc(sizeof(Datum) * n);
728 heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
729 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
730 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
731 newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
732 leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
734 xlrec.node = index->rd_node;
735 STORE_STATE(state, xlrec.stateSrc);
738 * Form list of leaf tuples which will be distributed as split result;
739 * also, count up the amount of space that will be freed from current.
740 * (Note that in the non-root case, we won't actually delete the old
741 * tuples, only replace them with redirects or placeholders.)
743 * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
744 * page. For a pass-by-value data type we will fetch a word that must
745 * exist even though it may contain garbage (because of the fact that leaf
746 * tuples must have size at least SGDTSIZE). For a pass-by-reference type
747 * we are just computing a pointer that isn't going to get dereferenced.
748 * So it's not worth guarding the calls with isNulls checks.
753 if (SpGistBlockIsRoot(current->blkno))
756 * We are splitting the root (which up to now is also a leaf page).
757 * Its tuples are not linked, so scan sequentially to get them all.
758 * We ignore the original value of current->offnum.
760 for (i = FirstOffsetNumber; i <= max; i++)
764 it = (SpGistLeafTuple) PageGetItem(current->page,
765 PageGetItemId(current->page, i));
766 if (it->tupstate == SPGIST_LIVE)
768 in.datums[nToInsert] = SGLTDATUM(it, state);
769 heapPtrs[nToInsert] = it->heapPtr;
771 toDelete[nToDelete] = i;
773 /* we will delete the tuple altogether, so count full space */
774 spaceToDelete += it->size + sizeof(ItemIdData);
776 else /* tuples on root should be live */
777 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
782 /* Normal case, just collect the leaf tuples in the chain */
784 while (i != InvalidOffsetNumber)
788 Assert(i >= FirstOffsetNumber && i <= max);
789 it = (SpGistLeafTuple) PageGetItem(current->page,
790 PageGetItemId(current->page, i));
791 if (it->tupstate == SPGIST_LIVE)
793 in.datums[nToInsert] = SGLTDATUM(it, state);
794 heapPtrs[nToInsert] = it->heapPtr;
796 toDelete[nToDelete] = i;
798 /* we will not delete the tuple, only replace with dead */
799 Assert(it->size >= SGDTSIZE);
800 spaceToDelete += it->size - SGDTSIZE;
802 else if (it->tupstate == SPGIST_DEAD)
804 /* We could see a DEAD tuple as first/only chain item */
805 Assert(i == current->offnum);
806 Assert(it->nextOffset == InvalidOffsetNumber);
807 toDelete[nToDelete] = i;
809 /* replacing it with redirect will save no space */
812 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
817 in.nTuples = nToInsert;
820 * We may not actually insert new tuple because another picksplit may be
821 * necessary due to too large value, but we will try to to allocate enough
822 * space to include it; and in any case it has to be included in the input
823 * for the picksplit function. So don't increment nToInsert yet.
825 in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
826 heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
829 memset(&out, 0, sizeof(out));
834 * Perform split using user-defined method.
836 procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
837 FunctionCall2Coll(procinfo,
838 index->rd_indcollation[0],
839 PointerGetDatum(&in),
840 PointerGetDatum(&out));
843 * Form new leaf tuples and count up the total space needed.
846 for (i = 0; i < in.nTuples; i++)
848 newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
849 out.leafTupleDatums[i],
851 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
857 * Perform dummy split that puts all tuples into one node.
858 * checkAllTheSame will override this and force allTheSame mode.
860 out.hasPrefix = false;
862 out.nodeLabels = NULL;
863 out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
866 * Form new leaf tuples and count up the total space needed.
869 for (i = 0; i < in.nTuples; i++)
871 newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
874 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
879 * Check to see if the picksplit function failed to separate the values,
880 * ie, it put them all into the same child node. If so, select allTheSame
881 * mode and create a random split instead. See comments for
882 * checkAllTheSame as to why we need to know if the new leaf tuples could
885 allTheSame = checkAllTheSame(&in, &out,
886 totalLeafSizes > SPGIST_PAGE_CAPACITY,
890 * If checkAllTheSame decided we must exclude the new tuple, don't
891 * consider it any further.
894 maxToInclude = in.nTuples;
897 maxToInclude = in.nTuples - 1;
898 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
902 * Allocate per-node work arrays. Since checkAllTheSame could replace
903 * out.nNodes with a value larger than the number of tuples on the input
904 * page, we can't allocate these arrays before here.
906 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
907 leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
910 * Form nodes of inner tuple and inner tuple itself
912 for (i = 0; i < out.nNodes; i++)
914 Datum label = (Datum) 0;
915 bool labelisnull = (out.nodeLabels == NULL);
918 label = out.nodeLabels[i];
919 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
921 innerTuple = spgFormInnerTuple(state,
922 out.hasPrefix, out.prefixDatum,
924 innerTuple->allTheSame = allTheSame;
927 * Update nodes[] array to point into the newly formed innerTuple, so
928 * that we can adjust their downlinks below.
930 SGITITERATE(innerTuple, i, node)
936 * Re-scan new leaf tuples and count up the space needed under each node.
938 for (i = 0; i < maxToInclude; i++)
940 n = out.mapTuplesToNodes[i];
941 if (n < 0 || n >= out.nNodes)
942 elog(ERROR, "inconsistent result of SPGiST picksplit function");
943 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
947 * To perform the split, we must insert a new inner tuple, which can't
948 * go on a leaf page; and unless we are splitting the root page, we
949 * must then update the parent tuple's downlink to point to the inner
950 * tuple. If there is room, we'll put the new inner tuple on the same
951 * page as the parent tuple, otherwise we need another non-leaf buffer.
952 * But if the parent page is the root, we can't add the new inner tuple
953 * there, because the root page must have only one inner tuple.
955 xlrec.initInner = false;
956 if (parent->buffer != InvalidBuffer &&
957 !SpGistBlockIsRoot(parent->blkno) &&
958 (SpGistPageGetFreeSpace(parent->page, 1) >=
959 innerTuple->size + sizeof(ItemIdData)))
961 /* New inner tuple will fit on parent page */
962 newInnerBuffer = parent->buffer;
964 else if (parent->buffer != InvalidBuffer)
966 /* Send tuple to page with next triple parity (see README) */
967 newInnerBuffer = SpGistGetBuffer(index,
968 GBUF_INNER_PARITY(parent->blkno + 1) |
969 (isNulls ? GBUF_NULLS : 0),
970 innerTuple->size + sizeof(ItemIdData),
975 /* Root page split ... inner tuple will go to root page */
976 newInnerBuffer = InvalidBuffer;
980 * Because a WAL record can't involve more than four buffers, we can
981 * only afford to deal with two leaf pages in each picksplit action,
982 * ie the current page and at most one other.
984 * The new leaf tuples converted from the existing ones should require
985 * the same or less space, and therefore should all fit onto one page
986 * (although that's not necessarily the current page, since we can't
987 * delete the old tuples but only replace them with placeholders).
988 * However, the incoming new tuple might not also fit, in which case
989 * we might need another picksplit cycle to reduce it some more.
991 * If there's not room to put everything back onto the current page,
992 * then we decide on a per-node basis which tuples go to the new page.
993 * (We do it like that because leaf tuple chains can't cross pages,
994 * so we must place all leaf tuples belonging to the same parent node
997 * If we are splitting the root page (turning it from a leaf page into an
998 * inner page), then no leaf tuples can go back to the current page; they
999 * must all go somewhere else.
1001 if (!SpGistBlockIsRoot(current->blkno))
1002 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1004 currentFreeSpace = 0; /* prevent assigning any tuples to current */
1006 xlrec.initDest = false;
1008 if (totalLeafSizes <= currentFreeSpace)
1010 /* All the leaf tuples will fit on current page */
1011 newLeafBuffer = InvalidBuffer;
1012 /* mark new leaf tuple as included in insertions, if allowed */
1018 for (i = 0; i < nToInsert; i++)
1019 leafPageSelect[i] = 0; /* signifies current page */
1021 else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1024 * We're trying to split up a long value by repeated suffixing, but
1025 * it's not going to fit yet. Don't bother allocating a second leaf
1026 * buffer that we won't be able to use.
1028 newLeafBuffer = InvalidBuffer;
1030 Assert(nToInsert == 0);
1034 /* We will need another leaf page */
1035 uint8 *nodePageSelect;
1039 newLeafBuffer = SpGistGetBuffer(index,
1040 GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1042 SPGIST_PAGE_CAPACITY),
1045 * Attempt to assign node groups to the two pages. We might fail to
1046 * do so, even if totalLeafSizes is less than the available space,
1047 * because we can't split a group across pages.
1049 nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1051 curspace = currentFreeSpace;
1052 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1053 for (i = 0; i < out.nNodes; i++)
1055 if (leafSizes[i] <= curspace)
1057 nodePageSelect[i] = 0; /* signifies current page */
1058 curspace -= leafSizes[i];
1062 nodePageSelect[i] = 1; /* signifies new leaf page */
1063 newspace -= leafSizes[i];
1066 if (curspace >= 0 && newspace >= 0)
1068 /* Successful assignment, so we can include the new leaf tuple */
1075 else if (includeNew)
1077 /* We must exclude the new leaf tuple from the split */
1078 int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1080 leafSizes[nodeOfNewTuple] -=
1081 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1083 /* Repeat the node assignment process --- should succeed now */
1084 curspace = currentFreeSpace;
1085 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1086 for (i = 0; i < out.nNodes; i++)
1088 if (leafSizes[i] <= curspace)
1090 nodePageSelect[i] = 0; /* signifies current page */
1091 curspace -= leafSizes[i];
1095 nodePageSelect[i] = 1; /* signifies new leaf page */
1096 newspace -= leafSizes[i];
1099 if (curspace < 0 || newspace < 0)
1100 elog(ERROR, "failed to divide leaf tuple groups across pages");
1104 /* oops, we already excluded new tuple ... should not get here */
1105 elog(ERROR, "failed to divide leaf tuple groups across pages");
1107 /* Expand the per-node assignments to be shown per leaf tuple */
1108 for (i = 0; i < nToInsert; i++)
1110 n = out.mapTuplesToNodes[i];
1111 leafPageSelect[i] = nodePageSelect[n];
1115 /* Start preparing WAL record */
1116 xlrec.blknoSrc = current->blkno;
1117 xlrec.blknoDest = InvalidBlockNumber;
1119 xlrec.initSrc = isNew;
1120 xlrec.storesNulls = isNulls;
1122 leafdata = leafptr = (char *) palloc(totalLeafSizes);
1124 ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
1125 ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, 1);
1128 /* Here we begin making the changes to the target pages */
1129 START_CRIT_SECTION();
1132 * Delete old leaf tuples from current buffer, except when we're splitting
1133 * the root; in that case there's no need because we'll re-init the page
1134 * below. We do this first to make room for reinserting new leaf tuples.
1136 if (!SpGistBlockIsRoot(current->blkno))
1139 * Init buffer instead of deleting individual tuples, but only if
1140 * there aren't any other live tuples and only during build; otherwise
1141 * we need to set a redirection tuple for concurrent scans.
1143 if (state->isBuild &&
1144 nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1145 PageGetMaxOffsetNumber(current->page))
1147 SpGistInitBuffer(current->buffer,
1148 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1149 xlrec.initSrc = true;
1153 /* don't expose the freshly init'd buffer as a backup block */
1154 Assert(nToDelete == 0);
1158 xlrec.nDelete = nToDelete;
1159 ACCEPT_RDATA_DATA(toDelete,
1160 MAXALIGN(sizeof(OffsetNumber) * nToDelete),
1163 ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
1166 if (!state->isBuild)
1169 * Need to create redirect tuple (it will point to new inner
1170 * tuple) but right now the new tuple's location is not known
1171 * yet. So, set the redirection pointer to "impossible" value
1172 * and remember its position to update tuple later.
1175 redirectTuplePos = toDelete[0];
1176 spgPageIndexMultiDelete(state, current->page,
1177 toDelete, nToDelete,
1180 SPGIST_METAPAGE_BLKNO,
1186 * During index build there is not concurrent searches, so we
1187 * don't need to create redirection tuple.
1189 spgPageIndexMultiDelete(state, current->page,
1190 toDelete, nToDelete,
1194 InvalidOffsetNumber);
1200 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1203 startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1204 for (i = 0; i < nToInsert; i++)
1206 SpGistLeafTuple it = newLeafs[i];
1208 BlockNumber leafBlock;
1209 OffsetNumber newoffset;
1211 /* Which page is it going to? */
1212 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1213 leafBlock = BufferGetBlockNumber(leafBuffer);
1215 /* Link tuple into correct chain for its node */
1216 n = out.mapTuplesToNodes[i];
1218 if (ItemPointerIsValid(&nodes[n]->t_tid))
1220 Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1221 it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1224 it->nextOffset = InvalidOffsetNumber;
1226 /* Insert it on page */
1227 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1228 (Item) it, it->size,
1229 &startOffsets[leafPageSelect[i]],
1231 toInsert[i] = newoffset;
1233 /* ... and complete the chain linking */
1234 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1236 /* Also copy leaf tuple into WAL data */
1237 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1238 leafptr += newLeafs[i]->size;
1242 * We're done modifying the other leaf buffer (if any), so mark it dirty.
1243 * current->buffer will be marked below, after we're entirely done
1246 if (newLeafBuffer != InvalidBuffer)
1248 MarkBufferDirty(newLeafBuffer);
1249 /* also save block number for WAL */
1250 xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer);
1251 if (!xlrec.initDest)
1253 ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata);
1258 xlrec.nInsert = nToInsert;
1259 ACCEPT_RDATA_DATA(toInsert,
1260 MAXALIGN(sizeof(OffsetNumber) * nToInsert),
1263 ACCEPT_RDATA_DATA(leafPageSelect,
1264 MAXALIGN(sizeof(uint8) * nToInsert),
1267 ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata);
1270 /* Remember current buffer, since we're about to change "current" */
1271 saveCurrent = *current;
1274 * Store the new innerTuple
1276 if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1279 * new inner tuple goes to parent page
1281 Assert(current->buffer != parent->buffer);
1283 /* Repoint "current" at the new inner tuple */
1284 current->blkno = parent->blkno;
1285 current->buffer = parent->buffer;
1286 current->page = parent->page;
1287 xlrec.blknoInner = current->blkno;
1288 xlrec.offnumInner = current->offnum =
1289 SpGistPageAddNewItem(state, current->page,
1290 (Item) innerTuple, innerTuple->size,
1294 * Update parent node link and mark parent page dirty
1296 xlrec.blknoParent = parent->blkno;
1297 xlrec.offnumParent = parent->offnum;
1298 xlrec.nodeI = parent->node;
1299 saveNodeLink(index, parent, current->blkno, current->offnum);
1301 ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
1305 * Update redirection link (in old current buffer)
1307 if (redirectTuplePos != InvalidOffsetNumber)
1308 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1309 current->blkno, current->offnum);
1311 /* Done modifying old current buffer, mark it dirty */
1312 MarkBufferDirty(saveCurrent.buffer);
1314 else if (parent->buffer != InvalidBuffer)
1317 * new inner tuple will be stored on a new page
1319 Assert(newInnerBuffer != InvalidBuffer);
1321 /* Repoint "current" at the new inner tuple */
1322 current->buffer = newInnerBuffer;
1323 current->blkno = BufferGetBlockNumber(current->buffer);
1324 current->page = BufferGetPage(current->buffer);
1325 xlrec.blknoInner = current->blkno;
1326 xlrec.offnumInner = current->offnum =
1327 SpGistPageAddNewItem(state, current->page,
1328 (Item) innerTuple, innerTuple->size,
1331 /* Done modifying new current buffer, mark it dirty */
1332 MarkBufferDirty(current->buffer);
1335 * Update parent node link and mark parent page dirty
1337 xlrec.blknoParent = parent->blkno;
1338 xlrec.offnumParent = parent->offnum;
1339 xlrec.nodeI = parent->node;
1340 saveNodeLink(index, parent, current->blkno, current->offnum);
1342 ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
1344 ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
1348 * Update redirection link (in old current buffer)
1350 if (redirectTuplePos != InvalidOffsetNumber)
1351 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1352 current->blkno, current->offnum);
1354 /* Done modifying old current buffer, mark it dirty */
1355 MarkBufferDirty(saveCurrent.buffer);
1360 * Splitting root page, which was a leaf but now becomes inner page
1361 * (and so "current" continues to point at it)
1363 Assert(SpGistBlockIsRoot(current->blkno));
1364 Assert(redirectTuplePos == InvalidOffsetNumber);
1366 SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1367 xlrec.initInner = true;
1369 xlrec.blknoInner = current->blkno;
1370 xlrec.offnumInner = current->offnum =
1371 PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1372 InvalidOffsetNumber, false, false);
1373 if (current->offnum != FirstOffsetNumber)
1374 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1377 /* No parent link to update, nor redirection to do */
1378 xlrec.blknoParent = InvalidBlockNumber;
1379 xlrec.offnumParent = InvalidOffsetNumber;
1382 /* Done modifying new current buffer, mark it dirty */
1383 MarkBufferDirty(current->buffer);
1385 /* saveCurrent doesn't represent a different buffer */
1386 saveCurrent.buffer = InvalidBuffer;
1389 if (RelationNeedsWAL(index))
1393 /* Issue the WAL record */
1394 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata);
1396 /* Update page LSNs on all affected pages */
1397 if (newLeafBuffer != InvalidBuffer)
1399 Page page = BufferGetPage(newLeafBuffer);
1401 PageSetLSN(page, recptr);
1402 PageSetTLI(page, ThisTimeLineID);
1405 if (saveCurrent.buffer != InvalidBuffer)
1407 Page page = BufferGetPage(saveCurrent.buffer);
1409 PageSetLSN(page, recptr);
1410 PageSetTLI(page, ThisTimeLineID);
1413 PageSetLSN(current->page, recptr);
1414 PageSetTLI(current->page, ThisTimeLineID);
1416 if (parent->buffer != InvalidBuffer)
1418 PageSetLSN(parent->page, recptr);
1419 PageSetTLI(parent->page, ThisTimeLineID);
1425 /* Update local free-space cache and unlock buffers */
1426 if (newLeafBuffer != InvalidBuffer)
1428 SpGistSetLastUsedPage(index, newLeafBuffer);
1429 UnlockReleaseBuffer(newLeafBuffer);
1431 if (saveCurrent.buffer != InvalidBuffer)
1433 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1434 UnlockReleaseBuffer(saveCurrent.buffer);
1441 * spgMatchNode action: descend to N'th child node of current inner tuple
1444 spgMatchNodeAction(Relation index, SpGistState *state,
1445 SpGistInnerTuple innerTuple,
1446 SPPageDesc *current, SPPageDesc *parent, int nodeN)
1449 SpGistNodeTuple node;
1451 /* Release previous parent buffer if any */
1452 if (parent->buffer != InvalidBuffer &&
1453 parent->buffer != current->buffer)
1455 SpGistSetLastUsedPage(index, parent->buffer);
1456 UnlockReleaseBuffer(parent->buffer);
1459 /* Repoint parent to specified node of current inner tuple */
1460 parent->blkno = current->blkno;
1461 parent->buffer = current->buffer;
1462 parent->page = current->page;
1463 parent->offnum = current->offnum;
1464 parent->node = nodeN;
1466 /* Locate that node */
1467 SGITITERATE(innerTuple, i, node)
1474 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1477 /* Point current to the downlink location, if any */
1478 if (ItemPointerIsValid(&node->t_tid))
1480 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1481 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1485 /* Downlink is empty, so we'll need to find a new page */
1486 current->blkno = InvalidBlockNumber;
1487 current->offnum = InvalidOffsetNumber;
1490 current->buffer = InvalidBuffer;
1491 current->page = NULL;
1495 * spgAddNode action: add a node to the inner tuple at current
1498 spgAddNodeAction(Relation index, SpGistState *state,
1499 SpGistInnerTuple innerTuple,
1500 SPPageDesc *current, SPPageDesc *parent,
1501 int nodeN, Datum nodeLabel)
1503 SpGistInnerTuple newInnerTuple;
1504 XLogRecData rdata[5];
1505 spgxlogAddNode xlrec;
1507 /* Should not be applied to nulls */
1508 Assert(!SpGistPageStoresNulls(current->page));
1510 /* Construct new inner tuple with additional node */
1511 newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1513 /* Prepare WAL record */
1514 xlrec.node = index->rd_node;
1515 STORE_STATE(state, xlrec.stateSrc);
1516 xlrec.blkno = current->blkno;
1517 xlrec.offnum = current->offnum;
1519 /* we don't fill these unless we need to change the parent downlink */
1520 xlrec.blknoParent = InvalidBlockNumber;
1521 xlrec.offnumParent = InvalidOffsetNumber;
1524 /* we don't fill these unless tuple has to be moved */
1525 xlrec.blknoNew = InvalidBlockNumber;
1526 xlrec.offnumNew = InvalidOffsetNumber;
1527 xlrec.newPage = false;
1529 ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
1530 /* we assume sizeof(xlrec) is at least int-aligned */
1531 ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1);
1532 ACCEPT_RDATA_BUFFER(current->buffer, 2);
1534 if (PageGetExactFreeSpace(current->page) >=
1535 newInnerTuple->size - innerTuple->size)
1538 * We can replace the inner tuple by new version in-place
1540 START_CRIT_SECTION();
1542 PageIndexTupleDelete(current->page, current->offnum);
1543 if (PageAddItem(current->page,
1544 (Item) newInnerTuple, newInnerTuple->size,
1545 current->offnum, false, false) != current->offnum)
1546 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1547 newInnerTuple->size);
1549 MarkBufferDirty(current->buffer);
1551 if (RelationNeedsWAL(index))
1555 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
1557 PageSetLSN(current->page, recptr);
1558 PageSetTLI(current->page, ThisTimeLineID);
1566 * move inner tuple to another page, and update parent
1569 SPPageDesc saveCurrent;
1572 * It should not be possible to get here for the root page, since we
1573 * allow only one inner tuple on the root page, and spgFormInnerTuple
1574 * always checks that inner tuples don't exceed the size of a page.
1576 if (SpGistBlockIsRoot(current->blkno))
1577 elog(ERROR, "cannot enlarge root tuple any more");
1578 Assert(parent->buffer != InvalidBuffer);
1580 saveCurrent = *current;
1582 xlrec.blknoParent = parent->blkno;
1583 xlrec.offnumParent = parent->offnum;
1584 xlrec.nodeI = parent->node;
1587 * obtain new buffer with the same parity as current, since it will
1588 * be a child of same parent tuple
1590 current->buffer = SpGistGetBuffer(index,
1591 GBUF_INNER_PARITY(current->blkno),
1592 newInnerTuple->size + sizeof(ItemIdData),
1594 current->blkno = BufferGetBlockNumber(current->buffer);
1595 current->page = BufferGetPage(current->buffer);
1597 xlrec.blknoNew = current->blkno;
1600 * Let's just make real sure new current isn't same as old. Right
1601 * now that's impossible, but if SpGistGetBuffer ever got smart enough
1602 * to delete placeholder tuples before checking space, maybe it
1603 * wouldn't be impossible. The case would appear to work except that
1604 * WAL replay would be subtly wrong, so I think a mere assert isn't
1607 if (xlrec.blknoNew == xlrec.blkno)
1608 elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1611 * New current and parent buffer will both be modified; but note that
1612 * parent buffer could be same as either new or old current.
1614 ACCEPT_RDATA_BUFFER(current->buffer, 3);
1615 if (parent->buffer != current->buffer &&
1616 parent->buffer != saveCurrent.buffer)
1617 ACCEPT_RDATA_BUFFER(parent->buffer, 4);
1619 START_CRIT_SECTION();
1621 /* insert new ... */
1622 xlrec.offnumNew = current->offnum =
1623 SpGistPageAddNewItem(state, current->page,
1624 (Item) newInnerTuple, newInnerTuple->size,
1627 MarkBufferDirty(current->buffer);
1629 /* update parent's downlink and mark parent page dirty */
1630 saveNodeLink(index, parent, current->blkno, current->offnum);
1633 * Replace old tuple with a placeholder or redirection tuple. Unless
1634 * doing an index build, we have to insert a redirection tuple for
1635 * possible concurrent scans. We can't just delete it in any case,
1636 * because that could change the offsets of other tuples on the page,
1637 * breaking downlinks from their parents.
1640 dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1641 InvalidBlockNumber, InvalidOffsetNumber);
1643 dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1644 current->blkno, current->offnum);
1646 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1647 if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1649 false, false) != saveCurrent.offnum)
1650 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1654 SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1656 SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1658 MarkBufferDirty(saveCurrent.buffer);
1660 if (RelationNeedsWAL(index))
1664 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
1666 /* we don't bother to check if any of these are redundant */
1667 PageSetLSN(current->page, recptr);
1668 PageSetTLI(current->page, ThisTimeLineID);
1669 PageSetLSN(parent->page, recptr);
1670 PageSetTLI(parent->page, ThisTimeLineID);
1671 PageSetLSN(saveCurrent.page, recptr);
1672 PageSetTLI(saveCurrent.page, ThisTimeLineID);
1677 /* Release saveCurrent if it's not same as current or parent */
1678 if (saveCurrent.buffer != current->buffer &&
1679 saveCurrent.buffer != parent->buffer)
1681 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1682 UnlockReleaseBuffer(saveCurrent.buffer);
1688 * spgSplitNode action: split inner tuple at current into prefix and postfix
1691 spgSplitNodeAction(Relation index, SpGistState *state,
1692 SpGistInnerTuple innerTuple,
1693 SPPageDesc *current, spgChooseOut *out)
1695 SpGistInnerTuple prefixTuple,
1697 SpGistNodeTuple node,
1699 BlockNumber postfixBlkno;
1700 OffsetNumber postfixOffset;
1702 XLogRecData rdata[5];
1703 spgxlogSplitTuple xlrec;
1704 Buffer newBuffer = InvalidBuffer;
1706 /* Should not be applied to nulls */
1707 Assert(!SpGistPageStoresNulls(current->page));
1710 * Construct new prefix tuple, containing a single node with the
1711 * specified label. (We'll update the node's downlink to point to the
1712 * new postfix tuple, below.)
1714 node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false);
1716 prefixTuple = spgFormInnerTuple(state,
1717 out->result.splitTuple.prefixHasPrefix,
1718 out->result.splitTuple.prefixPrefixDatum,
1721 /* it must fit in the space that innerTuple now occupies */
1722 if (prefixTuple->size > innerTuple->size)
1723 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1726 * Construct new postfix tuple, containing all nodes of innerTuple with
1727 * same node datums, but with the prefix specified by the picksplit
1730 nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1731 SGITITERATE(innerTuple, i, node)
1736 postfixTuple = spgFormInnerTuple(state,
1737 out->result.splitTuple.postfixHasPrefix,
1738 out->result.splitTuple.postfixPrefixDatum,
1739 innerTuple->nNodes, nodes);
1741 /* Postfix tuple is allTheSame if original tuple was */
1742 postfixTuple->allTheSame = innerTuple->allTheSame;
1744 /* prep data for WAL record */
1745 xlrec.node = index->rd_node;
1746 xlrec.newPage = false;
1748 ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
1749 /* we assume sizeof(xlrec) is at least int-aligned */
1750 ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1);
1751 ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2);
1752 ACCEPT_RDATA_BUFFER(current->buffer, 3);
1755 * If we can't fit both tuples on the current page, get a new page for the
1756 * postfix tuple. In particular, can't split to the root page.
1758 * For the space calculation, note that prefixTuple replaces innerTuple
1759 * but postfixTuple will be a new entry.
1761 if (SpGistBlockIsRoot(current->blkno) ||
1762 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1763 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1766 * Choose page with next triple parity, because postfix tuple is a
1767 * child of prefix one
1769 newBuffer = SpGistGetBuffer(index,
1770 GBUF_INNER_PARITY(current->blkno + 1),
1771 postfixTuple->size + sizeof(ItemIdData),
1773 ACCEPT_RDATA_BUFFER(newBuffer, 4);
1776 START_CRIT_SECTION();
1779 * Replace old tuple by prefix tuple
1781 PageIndexTupleDelete(current->page, current->offnum);
1782 xlrec.offnumPrefix = PageAddItem(current->page,
1783 (Item) prefixTuple, prefixTuple->size,
1784 current->offnum, false, false);
1785 if (xlrec.offnumPrefix != current->offnum)
1786 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1788 xlrec.blknoPrefix = current->blkno;
1791 * put postfix tuple into appropriate page
1793 if (newBuffer == InvalidBuffer)
1795 xlrec.blknoPostfix = postfixBlkno = current->blkno;
1796 xlrec.offnumPostfix = postfixOffset =
1797 SpGistPageAddNewItem(state, current->page,
1798 (Item) postfixTuple, postfixTuple->size,
1803 xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer);
1804 xlrec.offnumPostfix = postfixOffset =
1805 SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1806 (Item) postfixTuple, postfixTuple->size,
1808 MarkBufferDirty(newBuffer);
1812 * And set downlink pointer in the prefix tuple to point to postfix tuple.
1813 * (We can't avoid this step by doing the above two steps in opposite
1814 * order, because there might not be enough space on the page to insert
1815 * the postfix tuple first.) We have to update the local copy of the
1816 * prefixTuple too, because that's what will be written to WAL.
1818 spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1819 prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1820 PageGetItemId(current->page, current->offnum));
1821 spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1823 MarkBufferDirty(current->buffer);
1825 if (RelationNeedsWAL(index))
1829 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
1831 PageSetLSN(current->page, recptr);
1832 PageSetTLI(current->page, ThisTimeLineID);
1834 if (newBuffer != InvalidBuffer)
1836 PageSetLSN(BufferGetPage(newBuffer), recptr);
1837 PageSetTLI(BufferGetPage(newBuffer), ThisTimeLineID);
1843 /* Update local free-space cache and release buffer */
1844 if (newBuffer != InvalidBuffer)
1846 SpGistSetLastUsedPage(index, newBuffer);
1847 UnlockReleaseBuffer(newBuffer);
1852 * Insert one item into the index
1855 spgdoinsert(Relation index, SpGistState *state,
1856 ItemPointer heapPtr, Datum datum, bool isnull)
1865 * Since we don't use index_form_tuple in this AM, we have to make sure
1866 * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
1869 if (!isnull && state->attType.attlen == -1)
1870 datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1875 * Compute space needed for a leaf tuple containing the given datum.
1877 * If it isn't gonna fit, and the opclass can't reduce the datum size by
1878 * suffixing, bail out now rather than getting into an endless loop.
1881 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1882 SpGistGetTypeSize(&state->attType, leafDatum);
1884 leafSize = SGDTSIZE + sizeof(ItemIdData);
1886 if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
1888 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1889 errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
1890 (unsigned long) (leafSize - sizeof(ItemIdData)),
1891 (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
1892 RelationGetRelationName(index)),
1893 errhint("Values larger than a buffer page cannot be indexed.")));
1895 /* Initialize "current" to the appropriate root page */
1896 current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
1897 current.buffer = InvalidBuffer;
1898 current.page = NULL;
1899 current.offnum = FirstOffsetNumber;
1902 /* "parent" is invalid for the moment */
1903 parent.blkno = InvalidBlockNumber;
1904 parent.buffer = InvalidBuffer;
1906 parent.offnum = InvalidOffsetNumber;
1914 * Bail out if query cancel is pending. We must have this somewhere
1915 * in the loop since a broken opclass could produce an infinite
1918 CHECK_FOR_INTERRUPTS();
1920 if (current.blkno == InvalidBlockNumber)
1923 * Create a leaf page. If leafSize is too large to fit on a page,
1924 * we won't actually use the page yet, but it simplifies the API
1925 * for doPickSplit to always have a leaf page at hand; so just
1926 * quietly limit our request to a page size.
1929 SpGistGetBuffer(index,
1930 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
1931 Min(leafSize, SPGIST_PAGE_CAPACITY),
1933 current.blkno = BufferGetBlockNumber(current.buffer);
1935 else if (parent.buffer == InvalidBuffer ||
1936 current.blkno != parent.blkno)
1938 current.buffer = ReadBuffer(index, current.blkno);
1939 LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
1943 /* inner tuple can be stored on the same page as parent one */
1944 current.buffer = parent.buffer;
1946 current.page = BufferGetPage(current.buffer);
1948 /* should not arrive at a page of the wrong type */
1949 if (isnull ? !SpGistPageStoresNulls(current.page) :
1950 SpGistPageStoresNulls(current.page))
1951 elog(ERROR, "SPGiST index page %u has wrong nulls flag",
1954 if (SpGistPageIsLeaf(current.page))
1956 SpGistLeafTuple leafTuple;
1960 leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
1961 if (leafTuple->size + sizeof(ItemIdData) <=
1962 SpGistPageGetFreeSpace(current.page, 1))
1964 /* it fits on page, so insert it and we're done */
1965 addLeafTuple(index, state, leafTuple,
1966 ¤t, &parent, isnull, isNew);
1969 else if ((sizeToSplit =
1970 checkSplitConditions(index, state, ¤t,
1971 &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
1973 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
1976 * the amount of data is pretty small, so just move the whole
1977 * chain to another leaf page rather than splitting it.
1980 moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
1981 break; /* we're done */
1986 if (doPickSplit(index, state, ¤t, &parent,
1987 leafTuple, level, isnull, isNew))
1988 break; /* doPickSplit installed new tuples */
1990 /* leaf tuple will not be inserted yet */
1994 * current now describes new inner tuple, go insert into it
1996 Assert(!SpGistPageIsLeaf(current.page));
1997 goto process_inner_tuple;
2000 else /* non-leaf page */
2003 * Apply the opclass choose function to figure out how to insert
2004 * the given datum into the current inner tuple.
2006 SpGistInnerTuple innerTuple;
2012 * spgAddNode and spgSplitTuple cases will loop back to here to
2013 * complete the insertion operation. Just in case the choose
2014 * function is broken and produces add or split requests
2015 * repeatedly, check for query cancel.
2017 process_inner_tuple:
2018 CHECK_FOR_INTERRUPTS();
2020 innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2021 PageGetItemId(current.page, current.offnum));
2024 in.leafDatum = leafDatum;
2026 in.allTheSame = innerTuple->allTheSame;
2027 in.hasPrefix = (innerTuple->prefixSize > 0);
2028 in.prefixDatum = SGITDATUM(innerTuple, state);
2029 in.nNodes = innerTuple->nNodes;
2030 in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2032 memset(&out, 0, sizeof(out));
2036 /* use user-defined choose method */
2037 procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
2038 FunctionCall2Coll(procinfo,
2039 index->rd_indcollation[0],
2040 PointerGetDatum(&in),
2041 PointerGetDatum(&out));
2045 /* force "match" action (to insert to random subnode) */
2046 out.resultType = spgMatchNode;
2049 if (innerTuple->allTheSame)
2052 * It's not allowed to do an AddNode at an allTheSame tuple.
2053 * Opclass must say "match", in which case we choose a random
2054 * one of the nodes to descend into, or "split".
2056 if (out.resultType == spgAddNode)
2057 elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2058 else if (out.resultType == spgMatchNode)
2059 out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2062 switch (out.resultType)
2065 /* Descend to N'th child node */
2066 spgMatchNodeAction(index, state, innerTuple,
2068 out.result.matchNode.nodeN);
2069 /* Adjust level as per opclass request */
2070 level += out.result.matchNode.levelAdd;
2071 /* Replace leafDatum and recompute leafSize */
2074 leafDatum = out.result.matchNode.restDatum;
2075 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2076 SpGistGetTypeSize(&state->attType, leafDatum);
2080 * Loop around and attempt to insert the new leafDatum
2081 * at "current" (which might reference an existing child
2082 * tuple, or might be invalid to force us to find a new
2083 * page for the tuple).
2085 * Note: if the opclass sets longValuesOK, we rely on the
2086 * choose function to eventually shorten the leafDatum
2087 * enough to fit on a page. We could add a test here to
2088 * complain if the datum doesn't get visibly shorter each
2089 * time, but that could get in the way of opclasses that
2090 * "simplify" datums in a way that doesn't necessarily
2091 * lead to physical shortening on every cycle.
2095 /* AddNode is not sensible if nodes don't have labels */
2096 if (in.nodeLabels == NULL)
2097 elog(ERROR, "cannot add a node to an inner tuple without node labels");
2098 /* Add node to inner tuple, per request */
2099 spgAddNodeAction(index, state, innerTuple,
2101 out.result.addNode.nodeN,
2102 out.result.addNode.nodeLabel);
2105 * Retry insertion into the enlarged node. We assume
2106 * that we'll get a MatchNode result this time.
2108 goto process_inner_tuple;
2111 /* Split inner tuple, per request */
2112 spgSplitNodeAction(index, state, innerTuple,
2115 /* Retry insertion into the split node */
2116 goto process_inner_tuple;
2119 elog(ERROR, "unrecognized SPGiST choose result: %d",
2120 (int) out.resultType);
2127 * Release any buffers we're still holding. Beware of possibility that
2128 * current and parent reference same buffer.
2130 if (current.buffer != InvalidBuffer)
2132 SpGistSetLastUsedPage(index, current.buffer);
2133 UnlockReleaseBuffer(current.buffer);
2135 if (parent.buffer != InvalidBuffer &&
2136 parent.buffer != current.buffer)
2138 SpGistSetLastUsedPage(index, parent.buffer);
2139 UnlockReleaseBuffer(parent.buffer);