]> granicus.if.org Git - postgresql/blob - src/backend/access/spgist/spgutils.c
pgindent run for 9.4
[postgresql] / src / backend / access / spgist / spgutils.c
1 /*-------------------------------------------------------------------------
2  *
3  * spgutils.c
4  *        various support functions for SP-GiST
5  *
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/spgist/spgutils.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/reloptions.h"
20 #include "access/spgist_private.h"
21 #include "access/transam.h"
22 #include "access/xact.h"
23 #include "storage/bufmgr.h"
24 #include "storage/indexfsm.h"
25 #include "storage/lmgr.h"
26 #include "utils/lsyscache.h"
27
28
29 /* Fill in a SpGistTypeDesc struct with info about the specified data type */
30 static void
31 fillTypeDesc(SpGistTypeDesc *desc, Oid type)
32 {
33         desc->type = type;
34         get_typlenbyval(type, &desc->attlen, &desc->attbyval);
35 }
36
37 /*
38  * Fetch local cache of AM-specific info about the index, initializing it
39  * if necessary
40  */
41 SpGistCache *
42 spgGetCache(Relation index)
43 {
44         SpGistCache *cache;
45
46         if (index->rd_amcache == NULL)
47         {
48                 Oid                     atttype;
49                 spgConfigIn in;
50                 FmgrInfo   *procinfo;
51                 Buffer          metabuffer;
52                 SpGistMetaPageData *metadata;
53
54                 cache = MemoryContextAllocZero(index->rd_indexcxt,
55                                                                            sizeof(SpGistCache));
56
57                 /* SPGiST doesn't support multi-column indexes */
58                 Assert(index->rd_att->natts == 1);
59
60                 /*
61                  * Get the actual data type of the indexed column from the index
62                  * tupdesc.  We pass this to the opclass config function so that
63                  * polymorphic opclasses are possible.
64                  */
65                 atttype = index->rd_att->attrs[0]->atttypid;
66
67                 /* Call the config function to get config info for the opclass */
68                 in.attType = atttype;
69
70                 procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
71                 FunctionCall2Coll(procinfo,
72                                                   index->rd_indcollation[0],
73                                                   PointerGetDatum(&in),
74                                                   PointerGetDatum(&cache->config));
75
76                 /* Get the information we need about each relevant datatype */
77                 fillTypeDesc(&cache->attType, atttype);
78                 fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
79                 fillTypeDesc(&cache->attLabelType, cache->config.labelType);
80
81                 /* Last, get the lastUsedPages data from the metapage */
82                 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
83                 LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
84
85                 metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
86
87                 if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
88                         elog(ERROR, "index \"%s\" is not an SP-GiST index",
89                                  RelationGetRelationName(index));
90
91                 cache->lastUsedPages = metadata->lastUsedPages;
92
93                 UnlockReleaseBuffer(metabuffer);
94
95                 index->rd_amcache = (void *) cache;
96         }
97         else
98         {
99                 /* assume it's up to date */
100                 cache = (SpGistCache *) index->rd_amcache;
101         }
102
103         return cache;
104 }
105
106 /* Initialize SpGistState for working with the given index */
107 void
108 initSpGistState(SpGistState *state, Relation index)
109 {
110         SpGistCache *cache;
111
112         /* Get cached static information about index */
113         cache = spgGetCache(index);
114
115         state->config = cache->config;
116         state->attType = cache->attType;
117         state->attPrefixType = cache->attPrefixType;
118         state->attLabelType = cache->attLabelType;
119
120         /* Make workspace for constructing dead tuples */
121         state->deadTupleStorage = palloc0(SGDTSIZE);
122
123         /* Set XID to use in redirection tuples */
124         state->myXid = GetTopTransactionIdIfAny();
125
126         /* Assume we're not in an index build (spgbuild will override) */
127         state->isBuild = false;
128 }
129
130 /*
131  * Allocate a new page (either by recycling, or by extending the index file).
132  *
133  * The returned buffer is already pinned and exclusive-locked.
134  * Caller is responsible for initializing the page by calling SpGistInitBuffer.
135  */
136 Buffer
137 SpGistNewBuffer(Relation index)
138 {
139         Buffer          buffer;
140         bool            needLock;
141
142         /* First, try to get a page from FSM */
143         for (;;)
144         {
145                 BlockNumber blkno = GetFreeIndexPage(index);
146
147                 if (blkno == InvalidBlockNumber)
148                         break;                          /* nothing known to FSM */
149
150                 /*
151                  * The fixed pages shouldn't ever be listed in FSM, but just in case
152                  * one is, ignore it.
153                  */
154                 if (SpGistBlockIsFixed(blkno))
155                         continue;
156
157                 buffer = ReadBuffer(index, blkno);
158
159                 /*
160                  * We have to guard against the possibility that someone else already
161                  * recycled this page; the buffer may be locked if so.
162                  */
163                 if (ConditionalLockBuffer(buffer))
164                 {
165                         Page            page = BufferGetPage(buffer);
166
167                         if (PageIsNew(page))
168                                 return buffer;  /* OK to use, if never initialized */
169
170                         if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
171                                 return buffer;  /* OK to use */
172
173                         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
174                 }
175
176                 /* Can't use it, so release buffer and try again */
177                 ReleaseBuffer(buffer);
178         }
179
180         /* Must extend the file */
181         needLock = !RELATION_IS_LOCAL(index);
182         if (needLock)
183                 LockRelationForExtension(index, ExclusiveLock);
184
185         buffer = ReadBuffer(index, P_NEW);
186         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
187
188         if (needLock)
189                 UnlockRelationForExtension(index, ExclusiveLock);
190
191         return buffer;
192 }
193
194 /*
195  * Update index metapage's lastUsedPages info from local cache, if possible
196  *
197  * Updating meta page isn't critical for index working, so
198  * 1 use ConditionalLockBuffer to improve concurrency
199  * 2 don't WAL-log metabuffer changes to decrease WAL traffic
200  */
201 void
202 SpGistUpdateMetaPage(Relation index)
203 {
204         SpGistCache *cache = (SpGistCache *) index->rd_amcache;
205
206         if (cache != NULL)
207         {
208                 Buffer          metabuffer;
209                 SpGistMetaPageData *metadata;
210
211                 metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
212
213                 if (ConditionalLockBuffer(metabuffer))
214                 {
215                         metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
216                         metadata->lastUsedPages = cache->lastUsedPages;
217
218                         MarkBufferDirty(metabuffer);
219                         UnlockReleaseBuffer(metabuffer);
220                 }
221                 else
222                 {
223                         ReleaseBuffer(metabuffer);
224                 }
225         }
226 }
227
228 /* Macro to select proper element of lastUsedPages cache depending on flags */
229 /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
230 #define GET_LUP(c, f)  (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
231
232 /*
233  * Allocate and initialize a new buffer of the type and parity specified by
234  * flags.  The returned buffer is already pinned and exclusive-locked.
235  *
236  * When requesting an inner page, if we get one with the wrong parity,
237  * we just release the buffer and try again.  We will get a different page
238  * because GetFreeIndexPage will have marked the page used in FSM.  The page
239  * is entered in our local lastUsedPages cache, so there's some hope of
240  * making use of it later in this session, but otherwise we rely on VACUUM
241  * to eventually re-enter the page in FSM, making it available for recycling.
242  * Note that such a page does not get marked dirty here, so unless it's used
243  * fairly soon, the buffer will just get discarded and the page will remain
244  * as it was on disk.
245  *
246  * When we return a buffer to the caller, the page is *not* entered into
247  * the lastUsedPages cache; we expect the caller will do so after it's taken
248  * whatever space it will use.  This is because after the caller has used up
249  * some space, the page might have less space than whatever was cached already
250  * so we'd rather not trash the old cache entry.
251  */
252 static Buffer
253 allocNewBuffer(Relation index, int flags)
254 {
255         SpGistCache *cache = spgGetCache(index);
256         uint16          pageflags = 0;
257
258         if (GBUF_REQ_LEAF(flags))
259                 pageflags |= SPGIST_LEAF;
260         if (GBUF_REQ_NULLS(flags))
261                 pageflags |= SPGIST_NULLS;
262
263         for (;;)
264         {
265                 Buffer          buffer;
266
267                 buffer = SpGistNewBuffer(index);
268                 SpGistInitBuffer(buffer, pageflags);
269
270                 if (pageflags & SPGIST_LEAF)
271                 {
272                         /* Leaf pages have no parity concerns, so just use it */
273                         return buffer;
274                 }
275                 else
276                 {
277                         BlockNumber blkno = BufferGetBlockNumber(buffer);
278                         int                     blkFlags = GBUF_INNER_PARITY(blkno);
279
280                         if ((flags & GBUF_PARITY_MASK) == blkFlags)
281                         {
282                                 /* Page has right parity, use it */
283                                 return buffer;
284                         }
285                         else
286                         {
287                                 /* Page has wrong parity, record it in cache and try again */
288                                 if (pageflags & SPGIST_NULLS)
289                                         blkFlags |= GBUF_NULLS;
290                                 cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
291                                 cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
292                                         PageGetExactFreeSpace(BufferGetPage(buffer));
293                                 UnlockReleaseBuffer(buffer);
294                         }
295                 }
296         }
297 }
298
299 /*
300  * Get a buffer of the type and parity specified by flags, having at least
301  * as much free space as indicated by needSpace.  We use the lastUsedPages
302  * cache to assign the same buffer previously requested when possible.
303  * The returned buffer is already pinned and exclusive-locked.
304  *
305  * *isNew is set true if the page was initialized here, false if it was
306  * already valid.
307  */
308 Buffer
309 SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
310 {
311         SpGistCache *cache = spgGetCache(index);
312         SpGistLastUsedPage *lup;
313
314         /* Bail out if even an empty page wouldn't meet the demand */
315         if (needSpace > SPGIST_PAGE_CAPACITY)
316                 elog(ERROR, "desired SPGiST tuple size is too big");
317
318         /*
319          * If possible, increase the space request to include relation's
320          * fillfactor.  This ensures that when we add unrelated tuples to a page,
321          * we try to keep 100-fillfactor% available for adding tuples that are
322          * related to the ones already on it.  But fillfactor mustn't cause an
323          * error for requests that would otherwise be legal.
324          */
325         needSpace += RelationGetTargetPageFreeSpace(index,
326                                                                                                 SPGIST_DEFAULT_FILLFACTOR);
327         needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
328
329         /* Get the cache entry for this flags setting */
330         lup = GET_LUP(cache, flags);
331
332         /* If we have nothing cached, just turn it over to allocNewBuffer */
333         if (lup->blkno == InvalidBlockNumber)
334         {
335                 *isNew = true;
336                 return allocNewBuffer(index, flags);
337         }
338
339         /* fixed pages should never be in cache */
340         Assert(!SpGistBlockIsFixed(lup->blkno));
341
342         /* If cached freeSpace isn't enough, don't bother looking at the page */
343         if (lup->freeSpace >= needSpace)
344         {
345                 Buffer          buffer;
346                 Page            page;
347
348                 buffer = ReadBuffer(index, lup->blkno);
349
350                 if (!ConditionalLockBuffer(buffer))
351                 {
352                         /*
353                          * buffer is locked by another process, so return a new buffer
354                          */
355                         ReleaseBuffer(buffer);
356                         *isNew = true;
357                         return allocNewBuffer(index, flags);
358                 }
359
360                 page = BufferGetPage(buffer);
361
362                 if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
363                 {
364                         /* OK to initialize the page */
365                         uint16          pageflags = 0;
366
367                         if (GBUF_REQ_LEAF(flags))
368                                 pageflags |= SPGIST_LEAF;
369                         if (GBUF_REQ_NULLS(flags))
370                                 pageflags |= SPGIST_NULLS;
371                         SpGistInitBuffer(buffer, pageflags);
372                         lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
373                         *isNew = true;
374                         return buffer;
375                 }
376
377                 /*
378                  * Check that page is of right type and has enough space.  We must
379                  * recheck this since our cache isn't necessarily up to date.
380                  */
381                 if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
382                         (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
383                 {
384                         int                     freeSpace = PageGetExactFreeSpace(page);
385
386                         if (freeSpace >= needSpace)
387                         {
388                                 /* Success, update freespace info and return the buffer */
389                                 lup->freeSpace = freeSpace - needSpace;
390                                 *isNew = false;
391                                 return buffer;
392                         }
393                 }
394
395                 /*
396                  * fallback to allocation of new buffer
397                  */
398                 UnlockReleaseBuffer(buffer);
399         }
400
401         /* No success with cache, so return a new buffer */
402         *isNew = true;
403         return allocNewBuffer(index, flags);
404 }
405
406 /*
407  * Update lastUsedPages cache when done modifying a page.
408  *
409  * We update the appropriate cache entry if it already contained this page
410  * (its freeSpace is likely obsolete), or if this page has more space than
411  * whatever we had cached.
412  */
413 void
414 SpGistSetLastUsedPage(Relation index, Buffer buffer)
415 {
416         SpGistCache *cache = spgGetCache(index);
417         SpGistLastUsedPage *lup;
418         int                     freeSpace;
419         Page            page = BufferGetPage(buffer);
420         BlockNumber blkno = BufferGetBlockNumber(buffer);
421         int                     flags;
422
423         /* Never enter fixed pages (root pages) in cache, though */
424         if (SpGistBlockIsFixed(blkno))
425                 return;
426
427         if (SpGistPageIsLeaf(page))
428                 flags = GBUF_LEAF;
429         else
430                 flags = GBUF_INNER_PARITY(blkno);
431         if (SpGistPageStoresNulls(page))
432                 flags |= GBUF_NULLS;
433
434         lup = GET_LUP(cache, flags);
435
436         freeSpace = PageGetExactFreeSpace(page);
437         if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
438                 lup->freeSpace < freeSpace)
439         {
440                 lup->blkno = blkno;
441                 lup->freeSpace = freeSpace;
442         }
443 }
444
445 /*
446  * Initialize an SPGiST page to empty, with specified flags
447  */
448 void
449 SpGistInitPage(Page page, uint16 f)
450 {
451         SpGistPageOpaque opaque;
452
453         PageInit(page, BLCKSZ, MAXALIGN(sizeof(SpGistPageOpaqueData)));
454         opaque = SpGistPageGetOpaque(page);
455         memset(opaque, 0, sizeof(SpGistPageOpaqueData));
456         opaque->flags = f;
457         opaque->spgist_page_id = SPGIST_PAGE_ID;
458 }
459
460 /*
461  * Initialize a buffer's page to empty, with specified flags
462  */
463 void
464 SpGistInitBuffer(Buffer b, uint16 f)
465 {
466         Assert(BufferGetPageSize(b) == BLCKSZ);
467         SpGistInitPage(BufferGetPage(b), f);
468 }
469
470 /*
471  * Initialize metadata page
472  */
473 void
474 SpGistInitMetapage(Page page)
475 {
476         SpGistMetaPageData *metadata;
477         int                     i;
478
479         SpGistInitPage(page, SPGIST_META);
480         metadata = SpGistPageGetMeta(page);
481         memset(metadata, 0, sizeof(SpGistMetaPageData));
482         metadata->magicNumber = SPGIST_MAGIC_NUMBER;
483
484         /* initialize last-used-page cache to empty */
485         for (i = 0; i < SPGIST_CACHED_PAGES; i++)
486                 metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
487 }
488
489 /*
490  * reloptions processing for SPGiST
491  */
492 Datum
493 spgoptions(PG_FUNCTION_ARGS)
494 {
495         Datum           reloptions = PG_GETARG_DATUM(0);
496         bool            validate = PG_GETARG_BOOL(1);
497         bytea      *result;
498
499         result = default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
500
501         if (result)
502                 PG_RETURN_BYTEA_P(result);
503         PG_RETURN_NULL();
504 }
505
506 /*
507  * Get the space needed to store a non-null datum of the indicated type.
508  * Note the result is already rounded up to a MAXALIGN boundary.
509  * Also, we follow the SPGiST convention that pass-by-val types are
510  * just stored in their Datum representation (compare memcpyDatum).
511  */
512 unsigned int
513 SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
514 {
515         unsigned int size;
516
517         if (att->attbyval)
518                 size = sizeof(Datum);
519         else if (att->attlen > 0)
520                 size = att->attlen;
521         else
522                 size = VARSIZE_ANY(datum);
523
524         return MAXALIGN(size);
525 }
526
527 /*
528  * Copy the given non-null datum to *target
529  */
530 static void
531 memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
532 {
533         unsigned int size;
534
535         if (att->attbyval)
536         {
537                 memcpy(target, &datum, sizeof(Datum));
538         }
539         else
540         {
541                 size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
542                 memcpy(target, DatumGetPointer(datum), size);
543         }
544 }
545
546 /*
547  * Construct a leaf tuple containing the given heap TID and datum value
548  */
549 SpGistLeafTuple
550 spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
551                                  Datum datum, bool isnull)
552 {
553         SpGistLeafTuple tup;
554         unsigned int size;
555
556         /* compute space needed (note result is already maxaligned) */
557         size = SGLTHDRSZ;
558         if (!isnull)
559                 size += SpGistGetTypeSize(&state->attType, datum);
560
561         /*
562          * Ensure that we can replace the tuple with a dead tuple later.  This
563          * test is unnecessary when !isnull, but let's be safe.
564          */
565         if (size < SGDTSIZE)
566                 size = SGDTSIZE;
567
568         /* OK, form the tuple */
569         tup = (SpGistLeafTuple) palloc0(size);
570
571         tup->size = size;
572         tup->nextOffset = InvalidOffsetNumber;
573         tup->heapPtr = *heapPtr;
574         if (!isnull)
575                 memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum);
576
577         return tup;
578 }
579
580 /*
581  * Construct a node (to go into an inner tuple) containing the given label
582  *
583  * Note that the node's downlink is just set invalid here.  Caller will fill
584  * it in later.
585  */
586 SpGistNodeTuple
587 spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
588 {
589         SpGistNodeTuple tup;
590         unsigned int size;
591         unsigned short infomask = 0;
592
593         /* compute space needed (note result is already maxaligned) */
594         size = SGNTHDRSZ;
595         if (!isnull)
596                 size += SpGistGetTypeSize(&state->attLabelType, label);
597
598         /*
599          * Here we make sure that the size will fit in the field reserved for it
600          * in t_info.
601          */
602         if ((size & INDEX_SIZE_MASK) != size)
603                 ereport(ERROR,
604                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
605                                  errmsg("index row requires %zu bytes, maximum size is %zu",
606                                                 (Size) size, (Size) INDEX_SIZE_MASK)));
607
608         tup = (SpGistNodeTuple) palloc0(size);
609
610         if (isnull)
611                 infomask |= INDEX_NULL_MASK;
612         /* we don't bother setting the INDEX_VAR_MASK bit */
613         infomask |= size;
614         tup->t_info = infomask;
615
616         /* The TID field will be filled in later */
617         ItemPointerSetInvalid(&tup->t_tid);
618
619         if (!isnull)
620                 memcpyDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
621
622         return tup;
623 }
624
625 /*
626  * Construct an inner tuple containing the given prefix and node array
627  */
628 SpGistInnerTuple
629 spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
630                                   int nNodes, SpGistNodeTuple *nodes)
631 {
632         SpGistInnerTuple tup;
633         unsigned int size;
634         unsigned int prefixSize;
635         int                     i;
636         char       *ptr;
637
638         /* Compute size needed */
639         if (hasPrefix)
640                 prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix);
641         else
642                 prefixSize = 0;
643
644         size = SGITHDRSZ + prefixSize;
645
646         /* Note: we rely on node tuple sizes to be maxaligned already */
647         for (i = 0; i < nNodes; i++)
648                 size += IndexTupleSize(nodes[i]);
649
650         /*
651          * Ensure that we can replace the tuple with a dead tuple later.  This
652          * test is unnecessary given current tuple layouts, but let's be safe.
653          */
654         if (size < SGDTSIZE)
655                 size = SGDTSIZE;
656
657         /*
658          * Inner tuple should be small enough to fit on a page
659          */
660         if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
661                 ereport(ERROR,
662                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
663                                  errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
664                                                 (Size) size,
665                                                 SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
666                         errhint("Values larger than a buffer page cannot be indexed.")));
667
668         /*
669          * Check for overflow of header fields --- probably can't fail if the
670          * above succeeded, but let's be paranoid
671          */
672         if (size > SGITMAXSIZE ||
673                 prefixSize > SGITMAXPREFIXSIZE ||
674                 nNodes > SGITMAXNNODES)
675                 elog(ERROR, "SPGiST inner tuple header field is too small");
676
677         /* OK, form the tuple */
678         tup = (SpGistInnerTuple) palloc0(size);
679
680         tup->nNodes = nNodes;
681         tup->prefixSize = prefixSize;
682         tup->size = size;
683
684         if (hasPrefix)
685                 memcpyDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
686
687         ptr = (char *) SGITNODEPTR(tup);
688
689         for (i = 0; i < nNodes; i++)
690         {
691                 SpGistNodeTuple node = nodes[i];
692
693                 memcpy(ptr, node, IndexTupleSize(node));
694                 ptr += IndexTupleSize(node);
695         }
696
697         return tup;
698 }
699
700 /*
701  * Construct a "dead" tuple to replace a tuple being deleted.
702  *
703  * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
704  * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
705  * the xid field is filled in automatically.
706  *
707  * This is called in critical sections, so we don't use palloc; the tuple
708  * is built in preallocated storage.  It should be copied before another
709  * call with different parameters can occur.
710  */
711 SpGistDeadTuple
712 spgFormDeadTuple(SpGistState *state, int tupstate,
713                                  BlockNumber blkno, OffsetNumber offnum)
714 {
715         SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
716
717         tuple->tupstate = tupstate;
718         tuple->size = SGDTSIZE;
719         tuple->nextOffset = InvalidOffsetNumber;
720
721         if (tupstate == SPGIST_REDIRECT)
722         {
723                 ItemPointerSet(&tuple->pointer, blkno, offnum);
724                 Assert(TransactionIdIsValid(state->myXid));
725                 tuple->xid = state->myXid;
726         }
727         else
728         {
729                 ItemPointerSetInvalid(&tuple->pointer);
730                 tuple->xid = InvalidTransactionId;
731         }
732
733         return tuple;
734 }
735
736 /*
737  * Extract the label datums of the nodes within innerTuple
738  *
739  * Returns NULL if label datums are NULLs
740  */
741 Datum *
742 spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
743 {
744         Datum      *nodeLabels;
745         int                     i;
746         SpGistNodeTuple node;
747
748         /* Either all the labels must be NULL, or none. */
749         node = SGITNODEPTR(innerTuple);
750         if (IndexTupleHasNulls(node))
751         {
752                 SGITITERATE(innerTuple, i, node)
753                 {
754                         if (!IndexTupleHasNulls(node))
755                                 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
756                 }
757                 /* They're all null, so just return NULL */
758                 return NULL;
759         }
760         else
761         {
762                 nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
763                 SGITITERATE(innerTuple, i, node)
764                 {
765                         if (IndexTupleHasNulls(node))
766                                 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
767                         nodeLabels[i] = SGNTDATUM(node, state);
768                 }
769                 return nodeLabels;
770         }
771 }
772
773 /*
774  * Add a new item to the page, replacing a PLACEHOLDER item if possible.
775  * Return the location it's inserted at, or InvalidOffsetNumber on failure.
776  *
777  * If startOffset isn't NULL, we start searching for placeholders at
778  * *startOffset, and update that to the next place to search.  This is just
779  * an optimization for repeated insertions.
780  *
781  * If errorOK is false, we throw error when there's not enough room,
782  * rather than returning InvalidOffsetNumber.
783  */
784 OffsetNumber
785 SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
786                                          OffsetNumber *startOffset, bool errorOK)
787 {
788         SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
789         OffsetNumber i,
790                                 maxoff,
791                                 offnum;
792
793         if (opaque->nPlaceholder > 0 &&
794                 PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
795         {
796                 /* Try to replace a placeholder */
797                 maxoff = PageGetMaxOffsetNumber(page);
798                 offnum = InvalidOffsetNumber;
799
800                 for (;;)
801                 {
802                         if (startOffset && *startOffset != InvalidOffsetNumber)
803                                 i = *startOffset;
804                         else
805                                 i = FirstOffsetNumber;
806                         for (; i <= maxoff; i++)
807                         {
808                                 SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
809                                                                                                          PageGetItemId(page, i));
810
811                                 if (it->tupstate == SPGIST_PLACEHOLDER)
812                                 {
813                                         offnum = i;
814                                         break;
815                                 }
816                         }
817
818                         /* Done if we found a placeholder */
819                         if (offnum != InvalidOffsetNumber)
820                                 break;
821
822                         if (startOffset && *startOffset != InvalidOffsetNumber)
823                         {
824                                 /* Hint was no good, re-search from beginning */
825                                 *startOffset = InvalidOffsetNumber;
826                                 continue;
827                         }
828
829                         /* Hmm, no placeholder found? */
830                         opaque->nPlaceholder = 0;
831                         break;
832                 }
833
834                 if (offnum != InvalidOffsetNumber)
835                 {
836                         /* Replace the placeholder tuple */
837                         PageIndexTupleDelete(page, offnum);
838
839                         offnum = PageAddItem(page, item, size, offnum, false, false);
840
841                         /*
842                          * We should not have failed given the size check at the top of
843                          * the function, but test anyway.  If we did fail, we must PANIC
844                          * because we've already deleted the placeholder tuple, and
845                          * there's no other way to keep the damage from getting to disk.
846                          */
847                         if (offnum != InvalidOffsetNumber)
848                         {
849                                 Assert(opaque->nPlaceholder > 0);
850                                 opaque->nPlaceholder--;
851                                 if (startOffset)
852                                         *startOffset = offnum + 1;
853                         }
854                         else
855                                 elog(PANIC, "failed to add item of size %u to SPGiST index page",
856                                          (int) size);
857
858                         return offnum;
859                 }
860         }
861
862         /* No luck in replacing a placeholder, so just add it to the page */
863         offnum = PageAddItem(page, item, size,
864                                                  InvalidOffsetNumber, false, false);
865
866         if (offnum == InvalidOffsetNumber && !errorOK)
867                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
868                          (int) size);
869
870         return offnum;
871 }