1 /*-------------------------------------------------------------------------
4 * various support functions for SP-GiST
7 * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/spgist/spgutils.c
13 *-------------------------------------------------------------------------
18 #include "access/genam.h"
19 #include "access/reloptions.h"
20 #include "access/spgist_private.h"
21 #include "access/transam.h"
22 #include "access/xact.h"
23 #include "storage/bufmgr.h"
24 #include "storage/indexfsm.h"
25 #include "storage/lmgr.h"
26 #include "utils/lsyscache.h"
29 /* Fill in a SpGistTypeDesc struct with info about the specified data type */
31 fillTypeDesc(SpGistTypeDesc *desc, Oid type)
34 get_typlenbyval(type, &desc->attlen, &desc->attbyval);
38 * Fetch local cache of AM-specific info about the index, initializing it
42 spgGetCache(Relation index)
46 if (index->rd_amcache == NULL)
52 SpGistMetaPageData *metadata;
54 cache = MemoryContextAllocZero(index->rd_indexcxt,
57 /* SPGiST doesn't support multi-column indexes */
58 Assert(index->rd_att->natts == 1);
61 * Get the actual data type of the indexed column from the index
62 * tupdesc. We pass this to the opclass config function so that
63 * polymorphic opclasses are possible.
65 atttype = index->rd_att->attrs[0]->atttypid;
67 /* Call the config function to get config info for the opclass */
70 procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
71 FunctionCall2Coll(procinfo,
72 index->rd_indcollation[0],
74 PointerGetDatum(&cache->config));
76 /* Get the information we need about each relevant datatype */
77 fillTypeDesc(&cache->attType, atttype);
78 fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
79 fillTypeDesc(&cache->attLabelType, cache->config.labelType);
81 /* Last, get the lastUsedPages data from the metapage */
82 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
83 LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
85 metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
87 if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
88 elog(ERROR, "index \"%s\" is not an SP-GiST index",
89 RelationGetRelationName(index));
91 cache->lastUsedPages = metadata->lastUsedPages;
93 UnlockReleaseBuffer(metabuffer);
95 index->rd_amcache = (void *) cache;
99 /* assume it's up to date */
100 cache = (SpGistCache *) index->rd_amcache;
106 /* Initialize SpGistState for working with the given index */
108 initSpGistState(SpGistState *state, Relation index)
112 /* Get cached static information about index */
113 cache = spgGetCache(index);
115 state->config = cache->config;
116 state->attType = cache->attType;
117 state->attPrefixType = cache->attPrefixType;
118 state->attLabelType = cache->attLabelType;
120 /* Make workspace for constructing dead tuples */
121 state->deadTupleStorage = palloc0(SGDTSIZE);
123 /* Set XID to use in redirection tuples */
124 state->myXid = GetTopTransactionIdIfAny();
126 /* Assume we're not in an index build (spgbuild will override) */
127 state->isBuild = false;
131 * Allocate a new page (either by recycling, or by extending the index file).
133 * The returned buffer is already pinned and exclusive-locked.
134 * Caller is responsible for initializing the page by calling SpGistInitBuffer.
137 SpGistNewBuffer(Relation index)
142 /* First, try to get a page from FSM */
145 BlockNumber blkno = GetFreeIndexPage(index);
147 if (blkno == InvalidBlockNumber)
148 break; /* nothing known to FSM */
151 * The fixed pages shouldn't ever be listed in FSM, but just in case
154 if (SpGistBlockIsFixed(blkno))
157 buffer = ReadBuffer(index, blkno);
160 * We have to guard against the possibility that someone else already
161 * recycled this page; the buffer may be locked if so.
163 if (ConditionalLockBuffer(buffer))
165 Page page = BufferGetPage(buffer);
168 return buffer; /* OK to use, if never initialized */
170 if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
171 return buffer; /* OK to use */
173 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
176 /* Can't use it, so release buffer and try again */
177 ReleaseBuffer(buffer);
180 /* Must extend the file */
181 needLock = !RELATION_IS_LOCAL(index);
183 LockRelationForExtension(index, ExclusiveLock);
185 buffer = ReadBuffer(index, P_NEW);
186 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
189 UnlockRelationForExtension(index, ExclusiveLock);
195 * Update index metapage's lastUsedPages info from local cache, if possible
197 * Updating meta page isn't critical for index working, so
198 * 1 use ConditionalLockBuffer to improve concurrency
199 * 2 don't WAL-log metabuffer changes to decrease WAL traffic
202 SpGistUpdateMetaPage(Relation index)
204 SpGistCache *cache = (SpGistCache *) index->rd_amcache;
209 SpGistMetaPageData *metadata;
211 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
213 if (ConditionalLockBuffer(metabuffer))
215 metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
216 metadata->lastUsedPages = cache->lastUsedPages;
218 MarkBufferDirty(metabuffer);
219 UnlockReleaseBuffer(metabuffer);
223 ReleaseBuffer(metabuffer);
228 /* Macro to select proper element of lastUsedPages cache depending on flags */
229 /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
230 #define GET_LUP(c, f) (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
233 * Allocate and initialize a new buffer of the type and parity specified by
234 * flags. The returned buffer is already pinned and exclusive-locked.
236 * When requesting an inner page, if we get one with the wrong parity,
237 * we just release the buffer and try again. We will get a different page
238 * because GetFreeIndexPage will have marked the page used in FSM. The page
239 * is entered in our local lastUsedPages cache, so there's some hope of
240 * making use of it later in this session, but otherwise we rely on VACUUM
241 * to eventually re-enter the page in FSM, making it available for recycling.
242 * Note that such a page does not get marked dirty here, so unless it's used
243 * fairly soon, the buffer will just get discarded and the page will remain
246 * When we return a buffer to the caller, the page is *not* entered into
247 * the lastUsedPages cache; we expect the caller will do so after it's taken
248 * whatever space it will use. This is because after the caller has used up
249 * some space, the page might have less space than whatever was cached already
250 * so we'd rather not trash the old cache entry.
253 allocNewBuffer(Relation index, int flags)
255 SpGistCache *cache = spgGetCache(index);
256 uint16 pageflags = 0;
258 if (GBUF_REQ_LEAF(flags))
259 pageflags |= SPGIST_LEAF;
260 if (GBUF_REQ_NULLS(flags))
261 pageflags |= SPGIST_NULLS;
267 buffer = SpGistNewBuffer(index);
268 SpGistInitBuffer(buffer, pageflags);
270 if (pageflags & SPGIST_LEAF)
272 /* Leaf pages have no parity concerns, so just use it */
277 BlockNumber blkno = BufferGetBlockNumber(buffer);
278 int blkFlags = GBUF_INNER_PARITY(blkno);
280 if ((flags & GBUF_PARITY_MASK) == blkFlags)
282 /* Page has right parity, use it */
287 /* Page has wrong parity, record it in cache and try again */
288 if (pageflags & SPGIST_NULLS)
289 blkFlags |= GBUF_NULLS;
290 cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
291 cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
292 PageGetExactFreeSpace(BufferGetPage(buffer));
293 UnlockReleaseBuffer(buffer);
300 * Get a buffer of the type and parity specified by flags, having at least
301 * as much free space as indicated by needSpace. We use the lastUsedPages
302 * cache to assign the same buffer previously requested when possible.
303 * The returned buffer is already pinned and exclusive-locked.
305 * *isNew is set true if the page was initialized here, false if it was
309 SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
311 SpGistCache *cache = spgGetCache(index);
312 SpGistLastUsedPage *lup;
314 /* Bail out if even an empty page wouldn't meet the demand */
315 if (needSpace > SPGIST_PAGE_CAPACITY)
316 elog(ERROR, "desired SPGiST tuple size is too big");
319 * If possible, increase the space request to include relation's
320 * fillfactor. This ensures that when we add unrelated tuples to a page,
321 * we try to keep 100-fillfactor% available for adding tuples that are
322 * related to the ones already on it. But fillfactor mustn't cause an
323 * error for requests that would otherwise be legal.
325 needSpace += RelationGetTargetPageFreeSpace(index,
326 SPGIST_DEFAULT_FILLFACTOR);
327 needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
329 /* Get the cache entry for this flags setting */
330 lup = GET_LUP(cache, flags);
332 /* If we have nothing cached, just turn it over to allocNewBuffer */
333 if (lup->blkno == InvalidBlockNumber)
336 return allocNewBuffer(index, flags);
339 /* fixed pages should never be in cache */
340 Assert(!SpGistBlockIsFixed(lup->blkno));
342 /* If cached freeSpace isn't enough, don't bother looking at the page */
343 if (lup->freeSpace >= needSpace)
348 buffer = ReadBuffer(index, lup->blkno);
350 if (!ConditionalLockBuffer(buffer))
353 * buffer is locked by another process, so return a new buffer
355 ReleaseBuffer(buffer);
357 return allocNewBuffer(index, flags);
360 page = BufferGetPage(buffer);
362 if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
364 /* OK to initialize the page */
365 uint16 pageflags = 0;
367 if (GBUF_REQ_LEAF(flags))
368 pageflags |= SPGIST_LEAF;
369 if (GBUF_REQ_NULLS(flags))
370 pageflags |= SPGIST_NULLS;
371 SpGistInitBuffer(buffer, pageflags);
372 lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
378 * Check that page is of right type and has enough space. We must
379 * recheck this since our cache isn't necessarily up to date.
381 if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
382 (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
384 int freeSpace = PageGetExactFreeSpace(page);
386 if (freeSpace >= needSpace)
388 /* Success, update freespace info and return the buffer */
389 lup->freeSpace = freeSpace - needSpace;
396 * fallback to allocation of new buffer
398 UnlockReleaseBuffer(buffer);
401 /* No success with cache, so return a new buffer */
403 return allocNewBuffer(index, flags);
407 * Update lastUsedPages cache when done modifying a page.
409 * We update the appropriate cache entry if it already contained this page
410 * (its freeSpace is likely obsolete), or if this page has more space than
411 * whatever we had cached.
414 SpGistSetLastUsedPage(Relation index, Buffer buffer)
416 SpGistCache *cache = spgGetCache(index);
417 SpGistLastUsedPage *lup;
419 Page page = BufferGetPage(buffer);
420 BlockNumber blkno = BufferGetBlockNumber(buffer);
423 /* Never enter fixed pages (root pages) in cache, though */
424 if (SpGistBlockIsFixed(blkno))
427 if (SpGistPageIsLeaf(page))
430 flags = GBUF_INNER_PARITY(blkno);
431 if (SpGistPageStoresNulls(page))
434 lup = GET_LUP(cache, flags);
436 freeSpace = PageGetExactFreeSpace(page);
437 if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
438 lup->freeSpace < freeSpace)
441 lup->freeSpace = freeSpace;
446 * Initialize an SPGiST page to empty, with specified flags
449 SpGistInitPage(Page page, uint16 f)
451 SpGistPageOpaque opaque;
453 PageInit(page, BLCKSZ, MAXALIGN(sizeof(SpGistPageOpaqueData)));
454 opaque = SpGistPageGetOpaque(page);
455 memset(opaque, 0, sizeof(SpGistPageOpaqueData));
457 opaque->spgist_page_id = SPGIST_PAGE_ID;
461 * Initialize a buffer's page to empty, with specified flags
464 SpGistInitBuffer(Buffer b, uint16 f)
466 Assert(BufferGetPageSize(b) == BLCKSZ);
467 SpGistInitPage(BufferGetPage(b), f);
471 * Initialize metadata page
474 SpGistInitMetapage(Page page)
476 SpGistMetaPageData *metadata;
479 SpGistInitPage(page, SPGIST_META);
480 metadata = SpGistPageGetMeta(page);
481 memset(metadata, 0, sizeof(SpGistMetaPageData));
482 metadata->magicNumber = SPGIST_MAGIC_NUMBER;
484 /* initialize last-used-page cache to empty */
485 for (i = 0; i < SPGIST_CACHED_PAGES; i++)
486 metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
490 * reloptions processing for SPGiST
493 spgoptions(PG_FUNCTION_ARGS)
495 Datum reloptions = PG_GETARG_DATUM(0);
496 bool validate = PG_GETARG_BOOL(1);
499 result = default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
502 PG_RETURN_BYTEA_P(result);
507 * Get the space needed to store a non-null datum of the indicated type.
508 * Note the result is already rounded up to a MAXALIGN boundary.
509 * Also, we follow the SPGiST convention that pass-by-val types are
510 * just stored in their Datum representation (compare memcpyDatum).
513 SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
518 size = sizeof(Datum);
519 else if (att->attlen > 0)
522 size = VARSIZE_ANY(datum);
524 return MAXALIGN(size);
528 * Copy the given non-null datum to *target
531 memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
537 memcpy(target, &datum, sizeof(Datum));
541 size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
542 memcpy(target, DatumGetPointer(datum), size);
547 * Construct a leaf tuple containing the given heap TID and datum value
550 spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
551 Datum datum, bool isnull)
556 /* compute space needed (note result is already maxaligned) */
559 size += SpGistGetTypeSize(&state->attType, datum);
562 * Ensure that we can replace the tuple with a dead tuple later. This
563 * test is unnecessary when !isnull, but let's be safe.
568 /* OK, form the tuple */
569 tup = (SpGistLeafTuple) palloc0(size);
572 tup->nextOffset = InvalidOffsetNumber;
573 tup->heapPtr = *heapPtr;
575 memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum);
581 * Construct a node (to go into an inner tuple) containing the given label
583 * Note that the node's downlink is just set invalid here. Caller will fill
587 spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
591 unsigned short infomask = 0;
593 /* compute space needed (note result is already maxaligned) */
596 size += SpGistGetTypeSize(&state->attLabelType, label);
599 * Here we make sure that the size will fit in the field reserved for it
602 if ((size & INDEX_SIZE_MASK) != size)
604 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
605 errmsg("index row requires %zu bytes, maximum size is %zu",
606 (Size) size, (Size) INDEX_SIZE_MASK)));
608 tup = (SpGistNodeTuple) palloc0(size);
611 infomask |= INDEX_NULL_MASK;
612 /* we don't bother setting the INDEX_VAR_MASK bit */
614 tup->t_info = infomask;
616 /* The TID field will be filled in later */
617 ItemPointerSetInvalid(&tup->t_tid);
620 memcpyDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
626 * Construct an inner tuple containing the given prefix and node array
629 spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
630 int nNodes, SpGistNodeTuple *nodes)
632 SpGistInnerTuple tup;
634 unsigned int prefixSize;
638 /* Compute size needed */
640 prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix);
644 size = SGITHDRSZ + prefixSize;
646 /* Note: we rely on node tuple sizes to be maxaligned already */
647 for (i = 0; i < nNodes; i++)
648 size += IndexTupleSize(nodes[i]);
651 * Ensure that we can replace the tuple with a dead tuple later. This
652 * test is unnecessary given current tuple layouts, but let's be safe.
658 * Inner tuple should be small enough to fit on a page
660 if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
662 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
663 errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
665 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
666 errhint("Values larger than a buffer page cannot be indexed.")));
669 * Check for overflow of header fields --- probably can't fail if the
670 * above succeeded, but let's be paranoid
672 if (size > SGITMAXSIZE ||
673 prefixSize > SGITMAXPREFIXSIZE ||
674 nNodes > SGITMAXNNODES)
675 elog(ERROR, "SPGiST inner tuple header field is too small");
677 /* OK, form the tuple */
678 tup = (SpGistInnerTuple) palloc0(size);
680 tup->nNodes = nNodes;
681 tup->prefixSize = prefixSize;
685 memcpyDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
687 ptr = (char *) SGITNODEPTR(tup);
689 for (i = 0; i < nNodes; i++)
691 SpGistNodeTuple node = nodes[i];
693 memcpy(ptr, node, IndexTupleSize(node));
694 ptr += IndexTupleSize(node);
701 * Construct a "dead" tuple to replace a tuple being deleted.
703 * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
704 * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
705 * the xid field is filled in automatically.
707 * This is called in critical sections, so we don't use palloc; the tuple
708 * is built in preallocated storage. It should be copied before another
709 * call with different parameters can occur.
712 spgFormDeadTuple(SpGistState *state, int tupstate,
713 BlockNumber blkno, OffsetNumber offnum)
715 SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
717 tuple->tupstate = tupstate;
718 tuple->size = SGDTSIZE;
719 tuple->nextOffset = InvalidOffsetNumber;
721 if (tupstate == SPGIST_REDIRECT)
723 ItemPointerSet(&tuple->pointer, blkno, offnum);
724 Assert(TransactionIdIsValid(state->myXid));
725 tuple->xid = state->myXid;
729 ItemPointerSetInvalid(&tuple->pointer);
730 tuple->xid = InvalidTransactionId;
737 * Extract the label datums of the nodes within innerTuple
739 * Returns NULL if label datums are NULLs
742 spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
746 SpGistNodeTuple node;
748 /* Either all the labels must be NULL, or none. */
749 node = SGITNODEPTR(innerTuple);
750 if (IndexTupleHasNulls(node))
752 SGITITERATE(innerTuple, i, node)
754 if (!IndexTupleHasNulls(node))
755 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
757 /* They're all null, so just return NULL */
762 nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
763 SGITITERATE(innerTuple, i, node)
765 if (IndexTupleHasNulls(node))
766 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
767 nodeLabels[i] = SGNTDATUM(node, state);
774 * Add a new item to the page, replacing a PLACEHOLDER item if possible.
775 * Return the location it's inserted at, or InvalidOffsetNumber on failure.
777 * If startOffset isn't NULL, we start searching for placeholders at
778 * *startOffset, and update that to the next place to search. This is just
779 * an optimization for repeated insertions.
781 * If errorOK is false, we throw error when there's not enough room,
782 * rather than returning InvalidOffsetNumber.
785 SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
786 OffsetNumber *startOffset, bool errorOK)
788 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
793 if (opaque->nPlaceholder > 0 &&
794 PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
796 /* Try to replace a placeholder */
797 maxoff = PageGetMaxOffsetNumber(page);
798 offnum = InvalidOffsetNumber;
802 if (startOffset && *startOffset != InvalidOffsetNumber)
805 i = FirstOffsetNumber;
806 for (; i <= maxoff; i++)
808 SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
809 PageGetItemId(page, i));
811 if (it->tupstate == SPGIST_PLACEHOLDER)
818 /* Done if we found a placeholder */
819 if (offnum != InvalidOffsetNumber)
822 if (startOffset && *startOffset != InvalidOffsetNumber)
824 /* Hint was no good, re-search from beginning */
825 *startOffset = InvalidOffsetNumber;
829 /* Hmm, no placeholder found? */
830 opaque->nPlaceholder = 0;
834 if (offnum != InvalidOffsetNumber)
836 /* Replace the placeholder tuple */
837 PageIndexTupleDelete(page, offnum);
839 offnum = PageAddItem(page, item, size, offnum, false, false);
842 * We should not have failed given the size check at the top of
843 * the function, but test anyway. If we did fail, we must PANIC
844 * because we've already deleted the placeholder tuple, and
845 * there's no other way to keep the damage from getting to disk.
847 if (offnum != InvalidOffsetNumber)
849 Assert(opaque->nPlaceholder > 0);
850 opaque->nPlaceholder--;
852 *startOffset = offnum + 1;
855 elog(PANIC, "failed to add item of size %u to SPGiST index page",
862 /* No luck in replacing a placeholder, so just add it to the page */
863 offnum = PageAddItem(page, item, size,
864 InvalidOffsetNumber, false, false);
866 if (offnum == InvalidOffsetNumber && !errorOK)
867 elog(ERROR, "failed to add item of size %u to SPGiST index page",