1 /*-------------------------------------------------------------------------
4 * various support functions for SP-GiST
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/spgist/spgutils.c
13 *-------------------------------------------------------------------------
18 #include "access/amvalidate.h"
19 #include "access/htup_details.h"
20 #include "access/reloptions.h"
21 #include "access/spgist_private.h"
22 #include "access/transam.h"
23 #include "access/xact.h"
24 #include "catalog/pg_amop.h"
25 #include "storage/bufmgr.h"
26 #include "storage/indexfsm.h"
27 #include "storage/lmgr.h"
28 #include "utils/builtins.h"
29 #include "utils/catcache.h"
30 #include "utils/index_selfuncs.h"
31 #include "utils/lsyscache.h"
32 #include "utils/syscache.h"
36 * SP-GiST handler function: return IndexAmRoutine with access method parameters
40 spghandler(PG_FUNCTION_ARGS)
42 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
44 amroutine->amstrategies = 0;
45 amroutine->amsupport = SPGISTNProc;
46 amroutine->amcanorder = false;
47 amroutine->amcanorderbyop = true;
48 amroutine->amcanbackward = false;
49 amroutine->amcanunique = false;
50 amroutine->amcanmulticol = false;
51 amroutine->amoptionalkey = true;
52 amroutine->amsearcharray = false;
53 amroutine->amsearchnulls = true;
54 amroutine->amstorage = false;
55 amroutine->amclusterable = false;
56 amroutine->ampredlocks = false;
57 amroutine->amcanparallel = false;
58 amroutine->amcaninclude = false;
59 amroutine->amkeytype = InvalidOid;
61 amroutine->ambuild = spgbuild;
62 amroutine->ambuildempty = spgbuildempty;
63 amroutine->aminsert = spginsert;
64 amroutine->ambulkdelete = spgbulkdelete;
65 amroutine->amvacuumcleanup = spgvacuumcleanup;
66 amroutine->amcanreturn = spgcanreturn;
67 amroutine->amcostestimate = spgcostestimate;
68 amroutine->amoptions = spgoptions;
69 amroutine->amproperty = spgproperty;
70 amroutine->ambuildphasename = NULL;
71 amroutine->amvalidate = spgvalidate;
72 amroutine->ambeginscan = spgbeginscan;
73 amroutine->amrescan = spgrescan;
74 amroutine->amgettuple = spggettuple;
75 amroutine->amgetbitmap = spggetbitmap;
76 amroutine->amendscan = spgendscan;
77 amroutine->ammarkpos = NULL;
78 amroutine->amrestrpos = NULL;
79 amroutine->amestimateparallelscan = NULL;
80 amroutine->aminitparallelscan = NULL;
81 amroutine->amparallelrescan = NULL;
83 PG_RETURN_POINTER(amroutine);
86 /* Fill in a SpGistTypeDesc struct with info about the specified data type */
88 fillTypeDesc(SpGistTypeDesc *desc, Oid type)
91 get_typlenbyval(type, &desc->attlen, &desc->attbyval);
95 * Fetch local cache of AM-specific info about the index, initializing it
99 spgGetCache(Relation index)
103 if (index->rd_amcache == NULL)
109 SpGistMetaPageData *metadata;
111 cache = MemoryContextAllocZero(index->rd_indexcxt,
112 sizeof(SpGistCache));
114 /* SPGiST doesn't support multi-column indexes */
115 Assert(index->rd_att->natts == 1);
118 * Get the actual data type of the indexed column from the index
119 * tupdesc. We pass this to the opclass config function so that
120 * polymorphic opclasses are possible.
122 atttype = TupleDescAttr(index->rd_att, 0)->atttypid;
124 /* Call the config function to get config info for the opclass */
125 in.attType = atttype;
127 procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
128 FunctionCall2Coll(procinfo,
129 index->rd_indcollation[0],
130 PointerGetDatum(&in),
131 PointerGetDatum(&cache->config));
133 /* Get the information we need about each relevant datatype */
134 fillTypeDesc(&cache->attType, atttype);
136 if (OidIsValid(cache->config.leafType) &&
137 cache->config.leafType != atttype)
139 if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
141 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
142 errmsg("compress method must be defined when leaf type is different from input type")));
144 fillTypeDesc(&cache->attLeafType, cache->config.leafType);
148 cache->attLeafType = cache->attType;
151 fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
152 fillTypeDesc(&cache->attLabelType, cache->config.labelType);
154 /* Last, get the lastUsedPages data from the metapage */
155 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
156 LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
158 metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
160 if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
161 elog(ERROR, "index \"%s\" is not an SP-GiST index",
162 RelationGetRelationName(index));
164 cache->lastUsedPages = metadata->lastUsedPages;
166 UnlockReleaseBuffer(metabuffer);
168 index->rd_amcache = (void *) cache;
172 /* assume it's up to date */
173 cache = (SpGistCache *) index->rd_amcache;
179 /* Initialize SpGistState for working with the given index */
181 initSpGistState(SpGistState *state, Relation index)
185 /* Get cached static information about index */
186 cache = spgGetCache(index);
188 state->config = cache->config;
189 state->attType = cache->attType;
190 state->attLeafType = cache->attLeafType;
191 state->attPrefixType = cache->attPrefixType;
192 state->attLabelType = cache->attLabelType;
194 /* Make workspace for constructing dead tuples */
195 state->deadTupleStorage = palloc0(SGDTSIZE);
197 /* Set XID to use in redirection tuples */
198 state->myXid = GetTopTransactionIdIfAny();
200 /* Assume we're not in an index build (spgbuild will override) */
201 state->isBuild = false;
205 * Allocate a new page (either by recycling, or by extending the index file).
207 * The returned buffer is already pinned and exclusive-locked.
208 * Caller is responsible for initializing the page by calling SpGistInitBuffer.
211 SpGistNewBuffer(Relation index)
216 /* First, try to get a page from FSM */
219 BlockNumber blkno = GetFreeIndexPage(index);
221 if (blkno == InvalidBlockNumber)
222 break; /* nothing known to FSM */
225 * The fixed pages shouldn't ever be listed in FSM, but just in case
228 if (SpGistBlockIsFixed(blkno))
231 buffer = ReadBuffer(index, blkno);
234 * We have to guard against the possibility that someone else already
235 * recycled this page; the buffer may be locked if so.
237 if (ConditionalLockBuffer(buffer))
239 Page page = BufferGetPage(buffer);
242 return buffer; /* OK to use, if never initialized */
244 if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
245 return buffer; /* OK to use */
247 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
250 /* Can't use it, so release buffer and try again */
251 ReleaseBuffer(buffer);
254 /* Must extend the file */
255 needLock = !RELATION_IS_LOCAL(index);
257 LockRelationForExtension(index, ExclusiveLock);
259 buffer = ReadBuffer(index, P_NEW);
260 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
263 UnlockRelationForExtension(index, ExclusiveLock);
269 * Update index metapage's lastUsedPages info from local cache, if possible
271 * Updating meta page isn't critical for index working, so
272 * 1 use ConditionalLockBuffer to improve concurrency
273 * 2 don't WAL-log metabuffer changes to decrease WAL traffic
276 SpGistUpdateMetaPage(Relation index)
278 SpGistCache *cache = (SpGistCache *) index->rd_amcache;
284 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
286 if (ConditionalLockBuffer(metabuffer))
288 Page metapage = BufferGetPage(metabuffer);
289 SpGistMetaPageData *metadata = SpGistPageGetMeta(metapage);
291 metadata->lastUsedPages = cache->lastUsedPages;
294 * Set pd_lower just past the end of the metadata. This is
295 * essential, because without doing so, metadata will be lost if
296 * xlog.c compresses the page. (We must do this here because
297 * pre-v11 versions of PG did not set the metapage's pd_lower
298 * correctly, so a pg_upgraded index might contain the wrong
301 ((PageHeader) metapage)->pd_lower =
302 ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) metapage;
304 MarkBufferDirty(metabuffer);
305 UnlockReleaseBuffer(metabuffer);
309 ReleaseBuffer(metabuffer);
314 /* Macro to select proper element of lastUsedPages cache depending on flags */
315 /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
316 #define GET_LUP(c, f) (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
319 * Allocate and initialize a new buffer of the type and parity specified by
320 * flags. The returned buffer is already pinned and exclusive-locked.
322 * When requesting an inner page, if we get one with the wrong parity,
323 * we just release the buffer and try again. We will get a different page
324 * because GetFreeIndexPage will have marked the page used in FSM. The page
325 * is entered in our local lastUsedPages cache, so there's some hope of
326 * making use of it later in this session, but otherwise we rely on VACUUM
327 * to eventually re-enter the page in FSM, making it available for recycling.
328 * Note that such a page does not get marked dirty here, so unless it's used
329 * fairly soon, the buffer will just get discarded and the page will remain
332 * When we return a buffer to the caller, the page is *not* entered into
333 * the lastUsedPages cache; we expect the caller will do so after it's taken
334 * whatever space it will use. This is because after the caller has used up
335 * some space, the page might have less space than whatever was cached already
336 * so we'd rather not trash the old cache entry.
339 allocNewBuffer(Relation index, int flags)
341 SpGistCache *cache = spgGetCache(index);
342 uint16 pageflags = 0;
344 if (GBUF_REQ_LEAF(flags))
345 pageflags |= SPGIST_LEAF;
346 if (GBUF_REQ_NULLS(flags))
347 pageflags |= SPGIST_NULLS;
353 buffer = SpGistNewBuffer(index);
354 SpGistInitBuffer(buffer, pageflags);
356 if (pageflags & SPGIST_LEAF)
358 /* Leaf pages have no parity concerns, so just use it */
363 BlockNumber blkno = BufferGetBlockNumber(buffer);
364 int blkFlags = GBUF_INNER_PARITY(blkno);
366 if ((flags & GBUF_PARITY_MASK) == blkFlags)
368 /* Page has right parity, use it */
373 /* Page has wrong parity, record it in cache and try again */
374 if (pageflags & SPGIST_NULLS)
375 blkFlags |= GBUF_NULLS;
376 cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
377 cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
378 PageGetExactFreeSpace(BufferGetPage(buffer));
379 UnlockReleaseBuffer(buffer);
386 * Get a buffer of the type and parity specified by flags, having at least
387 * as much free space as indicated by needSpace. We use the lastUsedPages
388 * cache to assign the same buffer previously requested when possible.
389 * The returned buffer is already pinned and exclusive-locked.
391 * *isNew is set true if the page was initialized here, false if it was
395 SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
397 SpGistCache *cache = spgGetCache(index);
398 SpGistLastUsedPage *lup;
400 /* Bail out if even an empty page wouldn't meet the demand */
401 if (needSpace > SPGIST_PAGE_CAPACITY)
402 elog(ERROR, "desired SPGiST tuple size is too big");
405 * If possible, increase the space request to include relation's
406 * fillfactor. This ensures that when we add unrelated tuples to a page,
407 * we try to keep 100-fillfactor% available for adding tuples that are
408 * related to the ones already on it. But fillfactor mustn't cause an
409 * error for requests that would otherwise be legal.
411 needSpace += RelationGetTargetPageFreeSpace(index,
412 SPGIST_DEFAULT_FILLFACTOR);
413 needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
415 /* Get the cache entry for this flags setting */
416 lup = GET_LUP(cache, flags);
418 /* If we have nothing cached, just turn it over to allocNewBuffer */
419 if (lup->blkno == InvalidBlockNumber)
422 return allocNewBuffer(index, flags);
425 /* fixed pages should never be in cache */
426 Assert(!SpGistBlockIsFixed(lup->blkno));
428 /* If cached freeSpace isn't enough, don't bother looking at the page */
429 if (lup->freeSpace >= needSpace)
434 buffer = ReadBuffer(index, lup->blkno);
436 if (!ConditionalLockBuffer(buffer))
439 * buffer is locked by another process, so return a new buffer
441 ReleaseBuffer(buffer);
443 return allocNewBuffer(index, flags);
446 page = BufferGetPage(buffer);
448 if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
450 /* OK to initialize the page */
451 uint16 pageflags = 0;
453 if (GBUF_REQ_LEAF(flags))
454 pageflags |= SPGIST_LEAF;
455 if (GBUF_REQ_NULLS(flags))
456 pageflags |= SPGIST_NULLS;
457 SpGistInitBuffer(buffer, pageflags);
458 lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
464 * Check that page is of right type and has enough space. We must
465 * recheck this since our cache isn't necessarily up to date.
467 if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
468 (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
470 int freeSpace = PageGetExactFreeSpace(page);
472 if (freeSpace >= needSpace)
474 /* Success, update freespace info and return the buffer */
475 lup->freeSpace = freeSpace - needSpace;
482 * fallback to allocation of new buffer
484 UnlockReleaseBuffer(buffer);
487 /* No success with cache, so return a new buffer */
489 return allocNewBuffer(index, flags);
493 * Update lastUsedPages cache when done modifying a page.
495 * We update the appropriate cache entry if it already contained this page
496 * (its freeSpace is likely obsolete), or if this page has more space than
497 * whatever we had cached.
500 SpGistSetLastUsedPage(Relation index, Buffer buffer)
502 SpGistCache *cache = spgGetCache(index);
503 SpGistLastUsedPage *lup;
505 Page page = BufferGetPage(buffer);
506 BlockNumber blkno = BufferGetBlockNumber(buffer);
509 /* Never enter fixed pages (root pages) in cache, though */
510 if (SpGistBlockIsFixed(blkno))
513 if (SpGistPageIsLeaf(page))
516 flags = GBUF_INNER_PARITY(blkno);
517 if (SpGistPageStoresNulls(page))
520 lup = GET_LUP(cache, flags);
522 freeSpace = PageGetExactFreeSpace(page);
523 if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
524 lup->freeSpace < freeSpace)
527 lup->freeSpace = freeSpace;
532 * Initialize an SPGiST page to empty, with specified flags
535 SpGistInitPage(Page page, uint16 f)
537 SpGistPageOpaque opaque;
539 PageInit(page, BLCKSZ, MAXALIGN(sizeof(SpGistPageOpaqueData)));
540 opaque = SpGistPageGetOpaque(page);
541 memset(opaque, 0, sizeof(SpGistPageOpaqueData));
543 opaque->spgist_page_id = SPGIST_PAGE_ID;
547 * Initialize a buffer's page to empty, with specified flags
550 SpGistInitBuffer(Buffer b, uint16 f)
552 Assert(BufferGetPageSize(b) == BLCKSZ);
553 SpGistInitPage(BufferGetPage(b), f);
557 * Initialize metadata page
560 SpGistInitMetapage(Page page)
562 SpGistMetaPageData *metadata;
565 SpGistInitPage(page, SPGIST_META);
566 metadata = SpGistPageGetMeta(page);
567 memset(metadata, 0, sizeof(SpGistMetaPageData));
568 metadata->magicNumber = SPGIST_MAGIC_NUMBER;
570 /* initialize last-used-page cache to empty */
571 for (i = 0; i < SPGIST_CACHED_PAGES; i++)
572 metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
575 * Set pd_lower just past the end of the metadata. This is essential,
576 * because without doing so, metadata will be lost if xlog.c compresses
579 ((PageHeader) page)->pd_lower =
580 ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) page;
584 * reloptions processing for SPGiST
587 spgoptions(Datum reloptions, bool validate)
589 return default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
593 * Get the space needed to store a non-null datum of the indicated type.
594 * Note the result is already rounded up to a MAXALIGN boundary.
595 * Also, we follow the SPGiST convention that pass-by-val types are
596 * just stored in their Datum representation (compare memcpyDatum).
599 SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
604 size = sizeof(Datum);
605 else if (att->attlen > 0)
608 size = VARSIZE_ANY(datum);
610 return MAXALIGN(size);
614 * Copy the given non-null datum to *target
617 memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
623 memcpy(target, &datum, sizeof(Datum));
627 size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
628 memcpy(target, DatumGetPointer(datum), size);
633 * Construct a leaf tuple containing the given heap TID and datum value
636 spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
637 Datum datum, bool isnull)
642 /* compute space needed (note result is already maxaligned) */
645 size += SpGistGetTypeSize(&state->attLeafType, datum);
648 * Ensure that we can replace the tuple with a dead tuple later. This
649 * test is unnecessary when !isnull, but let's be safe.
654 /* OK, form the tuple */
655 tup = (SpGistLeafTuple) palloc0(size);
658 tup->nextOffset = InvalidOffsetNumber;
659 tup->heapPtr = *heapPtr;
661 memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum);
667 * Construct a node (to go into an inner tuple) containing the given label
669 * Note that the node's downlink is just set invalid here. Caller will fill
673 spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
677 unsigned short infomask = 0;
679 /* compute space needed (note result is already maxaligned) */
682 size += SpGistGetTypeSize(&state->attLabelType, label);
685 * Here we make sure that the size will fit in the field reserved for it
688 if ((size & INDEX_SIZE_MASK) != size)
690 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
691 errmsg("index row requires %zu bytes, maximum size is %zu",
692 (Size) size, (Size) INDEX_SIZE_MASK)));
694 tup = (SpGistNodeTuple) palloc0(size);
697 infomask |= INDEX_NULL_MASK;
698 /* we don't bother setting the INDEX_VAR_MASK bit */
700 tup->t_info = infomask;
702 /* The TID field will be filled in later */
703 ItemPointerSetInvalid(&tup->t_tid);
706 memcpyDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
712 * Construct an inner tuple containing the given prefix and node array
715 spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
716 int nNodes, SpGistNodeTuple *nodes)
718 SpGistInnerTuple tup;
720 unsigned int prefixSize;
724 /* Compute size needed */
726 prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix);
730 size = SGITHDRSZ + prefixSize;
732 /* Note: we rely on node tuple sizes to be maxaligned already */
733 for (i = 0; i < nNodes; i++)
734 size += IndexTupleSize(nodes[i]);
737 * Ensure that we can replace the tuple with a dead tuple later. This
738 * test is unnecessary given current tuple layouts, but let's be safe.
744 * Inner tuple should be small enough to fit on a page
746 if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
748 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
749 errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
751 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
752 errhint("Values larger than a buffer page cannot be indexed.")));
755 * Check for overflow of header fields --- probably can't fail if the
756 * above succeeded, but let's be paranoid
758 if (size > SGITMAXSIZE ||
759 prefixSize > SGITMAXPREFIXSIZE ||
760 nNodes > SGITMAXNNODES)
761 elog(ERROR, "SPGiST inner tuple header field is too small");
763 /* OK, form the tuple */
764 tup = (SpGistInnerTuple) palloc0(size);
766 tup->nNodes = nNodes;
767 tup->prefixSize = prefixSize;
771 memcpyDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
773 ptr = (char *) SGITNODEPTR(tup);
775 for (i = 0; i < nNodes; i++)
777 SpGistNodeTuple node = nodes[i];
779 memcpy(ptr, node, IndexTupleSize(node));
780 ptr += IndexTupleSize(node);
787 * Construct a "dead" tuple to replace a tuple being deleted.
789 * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
790 * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
791 * the xid field is filled in automatically.
793 * This is called in critical sections, so we don't use palloc; the tuple
794 * is built in preallocated storage. It should be copied before another
795 * call with different parameters can occur.
798 spgFormDeadTuple(SpGistState *state, int tupstate,
799 BlockNumber blkno, OffsetNumber offnum)
801 SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
803 tuple->tupstate = tupstate;
804 tuple->size = SGDTSIZE;
805 tuple->nextOffset = InvalidOffsetNumber;
807 if (tupstate == SPGIST_REDIRECT)
809 ItemPointerSet(&tuple->pointer, blkno, offnum);
810 Assert(TransactionIdIsValid(state->myXid));
811 tuple->xid = state->myXid;
815 ItemPointerSetInvalid(&tuple->pointer);
816 tuple->xid = InvalidTransactionId;
823 * Extract the label datums of the nodes within innerTuple
825 * Returns NULL if label datums are NULLs
828 spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
832 SpGistNodeTuple node;
834 /* Either all the labels must be NULL, or none. */
835 node = SGITNODEPTR(innerTuple);
836 if (IndexTupleHasNulls(node))
838 SGITITERATE(innerTuple, i, node)
840 if (!IndexTupleHasNulls(node))
841 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
843 /* They're all null, so just return NULL */
848 nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
849 SGITITERATE(innerTuple, i, node)
851 if (IndexTupleHasNulls(node))
852 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
853 nodeLabels[i] = SGNTDATUM(node, state);
860 * Add a new item to the page, replacing a PLACEHOLDER item if possible.
861 * Return the location it's inserted at, or InvalidOffsetNumber on failure.
863 * If startOffset isn't NULL, we start searching for placeholders at
864 * *startOffset, and update that to the next place to search. This is just
865 * an optimization for repeated insertions.
867 * If errorOK is false, we throw error when there's not enough room,
868 * rather than returning InvalidOffsetNumber.
871 SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
872 OffsetNumber *startOffset, bool errorOK)
874 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
879 if (opaque->nPlaceholder > 0 &&
880 PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
882 /* Try to replace a placeholder */
883 maxoff = PageGetMaxOffsetNumber(page);
884 offnum = InvalidOffsetNumber;
888 if (startOffset && *startOffset != InvalidOffsetNumber)
891 i = FirstOffsetNumber;
892 for (; i <= maxoff; i++)
894 SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
895 PageGetItemId(page, i));
897 if (it->tupstate == SPGIST_PLACEHOLDER)
904 /* Done if we found a placeholder */
905 if (offnum != InvalidOffsetNumber)
908 if (startOffset && *startOffset != InvalidOffsetNumber)
910 /* Hint was no good, re-search from beginning */
911 *startOffset = InvalidOffsetNumber;
915 /* Hmm, no placeholder found? */
916 opaque->nPlaceholder = 0;
920 if (offnum != InvalidOffsetNumber)
922 /* Replace the placeholder tuple */
923 PageIndexTupleDelete(page, offnum);
925 offnum = PageAddItem(page, item, size, offnum, false, false);
928 * We should not have failed given the size check at the top of
929 * the function, but test anyway. If we did fail, we must PANIC
930 * because we've already deleted the placeholder tuple, and
931 * there's no other way to keep the damage from getting to disk.
933 if (offnum != InvalidOffsetNumber)
935 Assert(opaque->nPlaceholder > 0);
936 opaque->nPlaceholder--;
938 *startOffset = offnum + 1;
941 elog(PANIC, "failed to add item of size %u to SPGiST index page",
948 /* No luck in replacing a placeholder, so just add it to the page */
949 offnum = PageAddItem(page, item, size,
950 InvalidOffsetNumber, false, false);
952 if (offnum == InvalidOffsetNumber && !errorOK)
953 elog(ERROR, "failed to add item of size %u to SPGiST index page",
960 * spgproperty() -- Check boolean properties of indexes.
962 * This is optional for most AMs, but is required for SP-GiST because the core
963 * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
966 spgproperty(Oid index_oid, int attno,
967 IndexAMProperty prop, const char *propname,
968 bool *res, bool *isnull)
976 /* Only answer column-level inquiries */
982 case AMPROP_DISTANCE_ORDERABLE:
989 * Currently, SP-GiST distance-ordered scans require that there be a
990 * distance operator in the opclass with the default types. So we assume
991 * that if such a operator exists, then there's a reason for it.
994 /* First we need to know the column's opclass. */
995 opclass = get_index_column_opclass(index_oid, attno);
996 if (!OidIsValid(opclass))
1002 /* Now look up the opclass family and input datatype. */
1003 if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
1009 /* And now we can check whether the operator is provided. */
1010 catlist = SearchSysCacheList1(AMOPSTRATEGY,
1011 ObjectIdGetDatum(opfamily));
1015 for (i = 0; i < catlist->n_members; i++)
1017 HeapTuple amoptup = &catlist->members[i]->tuple;
1018 Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
1020 if (amopform->amoppurpose == AMOP_ORDER &&
1021 (amopform->amoplefttype == opcintype ||
1022 amopform->amoprighttype == opcintype) &&
1023 opfamily_can_sort_type(amopform->amopsortfamily,
1024 get_op_rettype(amopform->amopopr)))
1031 ReleaseSysCacheList(catlist);