]> granicus.if.org Git - postgresql/blob - src/backend/access/gist/gistutil.c
Repair bugs in GiST page splitting code for multi-column indexes.
[postgresql] / src / backend / access / gist / gistutil.c
1 /*-------------------------------------------------------------------------
2  *
3  * gistutil.c
4  *        utilities routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/gist/gistutil.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include <math.h>
17
18 #include "access/gist_private.h"
19 #include "access/reloptions.h"
20 #include "storage/indexfsm.h"
21 #include "storage/lmgr.h"
22 #include "utils/builtins.h"
23
24
25 /*
26  * Write itup vector to page, has no control of free space.
27  */
28 void
29 gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
30 {
31         OffsetNumber l = InvalidOffsetNumber;
32         int                     i;
33
34         if (off == InvalidOffsetNumber)
35                 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
36                         OffsetNumberNext(PageGetMaxOffsetNumber(page));
37
38         for (i = 0; i < len; i++)
39         {
40                 Size            sz = IndexTupleSize(itup[i]);
41
42                 l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
43                 if (l == InvalidOffsetNumber)
44                         elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
45                                  i, len, (int) sz);
46                 off++;
47         }
48 }
49
50 /*
51  * Check space for itup vector on page
52  */
53 bool
54 gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
55 {
56         unsigned int size = freespace,
57                                 deleted = 0;
58         int                     i;
59
60         for (i = 0; i < len; i++)
61                 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
62
63         if (todelete != InvalidOffsetNumber)
64         {
65                 IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
66
67                 deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
68         }
69
70         return (PageGetFreeSpace(page) + deleted < size);
71 }
72
73 bool
74 gistfitpage(IndexTuple *itvec, int len)
75 {
76         int                     i;
77         Size            size = 0;
78
79         for (i = 0; i < len; i++)
80                 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
81
82         /* TODO: Consider fillfactor */
83         return (size <= GiSTPageSize);
84 }
85
86 /*
87  * Read buffer into itup vector
88  */
89 IndexTuple *
90 gistextractpage(Page page, int *len /* out */ )
91 {
92         OffsetNumber i,
93                                 maxoff;
94         IndexTuple *itvec;
95
96         maxoff = PageGetMaxOffsetNumber(page);
97         *len = maxoff;
98         itvec = palloc(sizeof(IndexTuple) * maxoff);
99         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
100                 itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
101
102         return itvec;
103 }
104
105 /*
106  * join two vectors into one
107  */
108 IndexTuple *
109 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
110 {
111         itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
112         memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
113         *len += addlen;
114         return itvec;
115 }
116
117 /*
118  * make plain IndexTupleVector
119  */
120
121 IndexTupleData *
122 gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
123 {
124         char       *ptr,
125                            *ret;
126         int                     i;
127
128         *memlen = 0;
129
130         for (i = 0; i < veclen; i++)
131                 *memlen += IndexTupleSize(vec[i]);
132
133         ptr = ret = palloc(*memlen);
134
135         for (i = 0; i < veclen; i++)
136         {
137                 memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
138                 ptr += IndexTupleSize(vec[i]);
139         }
140
141         return (IndexTupleData *) ret;
142 }
143
144 /*
145  * Make unions of keys in IndexTuple vector (one union datum per index column).
146  * Union Datums are returned into the attr/isnull arrays.
147  * Resulting Datums aren't compressed.
148  */
149 void
150 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len,
151                                    Datum *attr, bool *isnull)
152 {
153         int                     i;
154         GistEntryVector *evec;
155         int                     attrsize;
156
157         evec = (GistEntryVector *) palloc((len + 2) * sizeof(GISTENTRY) + GEVHDRSZ);
158
159         for (i = 0; i < giststate->tupdesc->natts; i++)
160         {
161                 int                     j;
162
163                 /* Collect non-null datums for this column */
164                 evec->n = 0;
165                 for (j = 0; j < len; j++)
166                 {
167                         Datum           datum;
168                         bool            IsNull;
169
170                         datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
171                         if (IsNull)
172                                 continue;
173
174                         gistdentryinit(giststate, i,
175                                                    evec->vector + evec->n,
176                                                    datum,
177                                                    NULL, NULL, (OffsetNumber) 0,
178                                                    FALSE, IsNull);
179                         evec->n++;
180                 }
181
182                 /* If this column was all NULLs, the union is NULL */
183                 if (evec->n == 0)
184                 {
185                         attr[i] = (Datum) 0;
186                         isnull[i] = TRUE;
187                 }
188                 else
189                 {
190                         if (evec->n == 1)
191                         {
192                                 /* unionFn may expect at least two inputs */
193                                 evec->n = 2;
194                                 evec->vector[1] = evec->vector[0];
195                         }
196
197                         /* Make union and store in attr array */
198                         attr[i] = FunctionCall2Coll(&giststate->unionFn[i],
199                                                                                 giststate->supportCollation[i],
200                                                                                 PointerGetDatum(evec),
201                                                                                 PointerGetDatum(&attrsize));
202
203                         isnull[i] = FALSE;
204                 }
205         }
206 }
207
208 /*
209  * Return an IndexTuple containing the result of applying the "union"
210  * method to the specified IndexTuple vector.
211  */
212 IndexTuple
213 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
214 {
215         Datum           attr[INDEX_MAX_KEYS];
216         bool            isnull[INDEX_MAX_KEYS];
217
218         gistMakeUnionItVec(giststate, itvec, len, attr, isnull);
219
220         return gistFormTuple(giststate, r, attr, isnull, false);
221 }
222
223 /*
224  * makes union of two key
225  */
226 void
227 gistMakeUnionKey(GISTSTATE *giststate, int attno,
228                                  GISTENTRY *entry1, bool isnull1,
229                                  GISTENTRY *entry2, bool isnull2,
230                                  Datum *dst, bool *dstisnull)
231 {
232         /* we need a GistEntryVector with room for exactly 2 elements */
233         union
234         {
235                 GistEntryVector gev;
236                 char            padding[2 * sizeof(GISTENTRY) + GEVHDRSZ];
237         }                       storage;
238         GistEntryVector *evec = &storage.gev;
239         int                     dstsize;
240
241         evec->n = 2;
242
243         if (isnull1 && isnull2)
244         {
245                 *dstisnull = TRUE;
246                 *dst = (Datum) 0;
247         }
248         else
249         {
250                 if (isnull1 == FALSE && isnull2 == FALSE)
251                 {
252                         evec->vector[0] = *entry1;
253                         evec->vector[1] = *entry2;
254                 }
255                 else if (isnull1 == FALSE)
256                 {
257                         evec->vector[0] = *entry1;
258                         evec->vector[1] = *entry1;
259                 }
260                 else
261                 {
262                         evec->vector[0] = *entry2;
263                         evec->vector[1] = *entry2;
264                 }
265
266                 *dstisnull = FALSE;
267                 *dst = FunctionCall2Coll(&giststate->unionFn[attno],
268                                                                  giststate->supportCollation[attno],
269                                                                  PointerGetDatum(evec),
270                                                                  PointerGetDatum(&dstsize));
271         }
272 }
273
274 bool
275 gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b)
276 {
277         bool            result;
278
279         FunctionCall3Coll(&giststate->equalFn[attno],
280                                           giststate->supportCollation[attno],
281                                           a, b,
282                                           PointerGetDatum(&result));
283         return result;
284 }
285
286 /*
287  * Decompress all keys in tuple
288  */
289 void
290 gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
291                                   OffsetNumber o, GISTENTRY *attdata, bool *isnull)
292 {
293         int                     i;
294
295         for (i = 0; i < r->rd_att->natts; i++)
296         {
297                 Datum           datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
298
299                 gistdentryinit(giststate, i, &attdata[i],
300                                            datum, r, p, o,
301                                            FALSE, isnull[i]);
302         }
303 }
304
305 /*
306  * Forms union of oldtup and addtup, if union == oldtup then return NULL
307  */
308 IndexTuple
309 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
310 {
311         bool            neednew = FALSE;
312         GISTENTRY       oldentries[INDEX_MAX_KEYS],
313                                 addentries[INDEX_MAX_KEYS];
314         bool            oldisnull[INDEX_MAX_KEYS],
315                                 addisnull[INDEX_MAX_KEYS];
316         Datum           attr[INDEX_MAX_KEYS];
317         bool            isnull[INDEX_MAX_KEYS];
318         IndexTuple      newtup = NULL;
319         int                     i;
320
321         gistDeCompressAtt(giststate, r, oldtup, NULL,
322                                           (OffsetNumber) 0, oldentries, oldisnull);
323
324         gistDeCompressAtt(giststate, r, addtup, NULL,
325                                           (OffsetNumber) 0, addentries, addisnull);
326
327         for (i = 0; i < r->rd_att->natts; i++)
328         {
329                 gistMakeUnionKey(giststate, i,
330                                                  oldentries + i, oldisnull[i],
331                                                  addentries + i, addisnull[i],
332                                                  attr + i, isnull + i);
333
334                 if (neednew)
335                         /* we already need new key, so we can skip check */
336                         continue;
337
338                 if (isnull[i])
339                         /* union of key may be NULL if and only if both keys are NULL */
340                         continue;
341
342                 if (!addisnull[i])
343                 {
344                         if (oldisnull[i] ||
345                                 !gistKeyIsEQ(giststate, i, oldentries[i].key, attr[i]))
346                                 neednew = true;
347                 }
348         }
349
350         if (neednew)
351         {
352                 /* need to update key */
353                 newtup = gistFormTuple(giststate, r, attr, isnull, false);
354                 newtup->t_tid = oldtup->t_tid;
355         }
356
357         return newtup;
358 }
359
360 /*
361  * Search an upper index page for the entry with lowest penalty for insertion
362  * of the new index key contained in "it".
363  *
364  * Returns the index of the page entry to insert into.
365  */
366 OffsetNumber
367 gistchoose(Relation r, Page p, IndexTuple it,   /* it has compressed entry */
368                    GISTSTATE *giststate)
369 {
370         OffsetNumber result;
371         OffsetNumber maxoff;
372         OffsetNumber i;
373         float           best_penalty[INDEX_MAX_KEYS];
374         GISTENTRY       entry,
375                                 identry[INDEX_MAX_KEYS];
376         bool            isnull[INDEX_MAX_KEYS];
377         int                     keep_current_best;
378
379         Assert(!GistPageIsLeaf(p));
380
381         gistDeCompressAtt(giststate, r,
382                                           it, NULL, (OffsetNumber) 0,
383                                           identry, isnull);
384
385         /* we'll return FirstOffsetNumber if page is empty (shouldn't happen) */
386         result = FirstOffsetNumber;
387
388         /*
389          * The index may have multiple columns, and there's a penalty value for
390          * each column.  The penalty associated with a column that appears earlier
391          * in the index definition is strictly more important than the penalty of
392          * a column that appears later in the index definition.
393          *
394          * best_penalty[j] is the best penalty we have seen so far for column j,
395          * or -1 when we haven't yet examined column j.  Array entries to the
396          * right of the first -1 are undefined.
397          */
398         best_penalty[0] = -1;
399
400         /*
401          * If we find a tuple that's exactly as good as the currently best one, we
402          * could use either one.  When inserting a lot of tuples with the same or
403          * similar keys, it's preferable to descend down the same path when
404          * possible, as that's more cache-friendly.  On the other hand, if all
405          * inserts land on the same leaf page after a split, we're never going to
406          * insert anything to the other half of the split, and will end up using
407          * only 50% of the available space.  Distributing the inserts evenly would
408          * lead to better space usage, but that hurts cache-locality during
409          * insertion.  To get the best of both worlds, when we find a tuple that's
410          * exactly as good as the previous best, choose randomly whether to stick
411          * to the old best, or use the new one.  Once we decide to stick to the
412          * old best, we keep sticking to it for any subsequent equally good tuples
413          * we might find.  This favors tuples with low offsets, but still allows
414          * some inserts to go to other equally-good subtrees.
415          *
416          * keep_current_best is -1 if we haven't yet had to make a random choice
417          * whether to keep the current best tuple.  If we have done so, and
418          * decided to keep it, keep_current_best is 1; if we've decided to
419          * replace, keep_current_best is 0.  (This state will be reset to -1 as
420          * soon as we've made the replacement, but sometimes we make the choice in
421          * advance of actually finding a replacement best tuple.)
422          */
423         keep_current_best = -1;
424
425         /*
426          * Loop over tuples on page.
427          */
428         maxoff = PageGetMaxOffsetNumber(p);
429         Assert(maxoff >= FirstOffsetNumber);
430
431         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
432         {
433                 IndexTuple      itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
434                 bool            zero_penalty;
435                 int                     j;
436
437                 zero_penalty = true;
438
439                 /* Loop over index attributes. */
440                 for (j = 0; j < r->rd_att->natts; j++)
441                 {
442                         Datum           datum;
443                         float           usize;
444                         bool            IsNull;
445
446                         /* Compute penalty for this column. */
447                         datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
448                         gistdentryinit(giststate, j, &entry, datum, r, p, i,
449                                                    FALSE, IsNull);
450                         usize = gistpenalty(giststate, j, &entry, IsNull,
451                                                                 &identry[j], isnull[j]);
452                         if (usize > 0)
453                                 zero_penalty = false;
454
455                         if (best_penalty[j] < 0 || usize < best_penalty[j])
456                         {
457                                 /*
458                                  * New best penalty for column.  Tentatively select this tuple
459                                  * as the target, and record the best penalty.  Then reset the
460                                  * next column's penalty to "unknown" (and indirectly, the
461                                  * same for all the ones to its right).  This will force us to
462                                  * adopt this tuple's penalty values as the best for all the
463                                  * remaining columns during subsequent loop iterations.
464                                  */
465                                 result = i;
466                                 best_penalty[j] = usize;
467
468                                 if (j < r->rd_att->natts - 1)
469                                         best_penalty[j + 1] = -1;
470
471                                 /* we have new best, so reset keep-it decision */
472                                 keep_current_best = -1;
473                         }
474                         else if (best_penalty[j] == usize)
475                         {
476                                 /*
477                                  * The current tuple is exactly as good for this column as the
478                                  * best tuple seen so far.      The next iteration of this loop
479                                  * will compare the next column.
480                                  */
481                         }
482                         else
483                         {
484                                 /*
485                                  * The current tuple is worse for this column than the best
486                                  * tuple seen so far.  Skip the remaining columns and move on
487                                  * to the next tuple, if any.
488                                  */
489                                 zero_penalty = false;   /* so outer loop won't exit */
490                                 break;
491                         }
492                 }
493
494                 /*
495                  * If we looped past the last column, and did not update "result",
496                  * then this tuple is exactly as good as the prior best tuple.
497                  */
498                 if (j == r->rd_att->natts && result != i)
499                 {
500                         if (keep_current_best == -1)
501                         {
502                                 /* we didn't make the random choice yet for this old best */
503                                 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
504                         }
505                         if (keep_current_best == 0)
506                         {
507                                 /* we choose to use the new tuple */
508                                 result = i;
509                                 /* choose again if there are even more exactly-as-good ones */
510                                 keep_current_best = -1;
511                         }
512                 }
513
514                 /*
515                  * If we find a tuple with zero penalty for all columns, and we've
516                  * decided we don't want to search for another tuple with equal
517                  * penalty, there's no need to examine remaining tuples; just break
518                  * out of the loop and return it.
519                  */
520                 if (zero_penalty)
521                 {
522                         if (keep_current_best == -1)
523                         {
524                                 /* we didn't make the random choice yet for this old best */
525                                 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
526                         }
527                         if (keep_current_best == 1)
528                                 break;
529                 }
530         }
531
532         return result;
533 }
534
535 /*
536  * initialize a GiST entry with a decompressed version of key
537  */
538 void
539 gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
540                            Datum k, Relation r, Page pg, OffsetNumber o,
541                            bool l, bool isNull)
542 {
543         if (!isNull)
544         {
545                 GISTENTRY  *dep;
546
547                 gistentryinit(*e, k, r, pg, o, l);
548                 dep = (GISTENTRY *)
549                         DatumGetPointer(FunctionCall1Coll(&giststate->decompressFn[nkey],
550                                                                                    giststate->supportCollation[nkey],
551                                                                                           PointerGetDatum(e)));
552                 /* decompressFn may just return the given pointer */
553                 if (dep != e)
554                         gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
555                                                   dep->leafkey);
556         }
557         else
558                 gistentryinit(*e, (Datum) 0, r, pg, o, l);
559 }
560
561
562 /*
563  * initialize a GiST entry with a compressed version of key
564  */
565 void
566 gistcentryinit(GISTSTATE *giststate, int nkey,
567                            GISTENTRY *e, Datum k, Relation r,
568                            Page pg, OffsetNumber o, bool l, bool isNull)
569 {
570         if (!isNull)
571         {
572                 GISTENTRY  *cep;
573
574                 gistentryinit(*e, k, r, pg, o, l);
575                 cep = (GISTENTRY *)
576                         DatumGetPointer(FunctionCall1Coll(&giststate->compressFn[nkey],
577                                                                                    giststate->supportCollation[nkey],
578                                                                                           PointerGetDatum(e)));
579                 /* compressFn may just return the given pointer */
580                 if (cep != e)
581                         gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
582                                                   cep->leafkey);
583         }
584         else
585                 gistentryinit(*e, (Datum) 0, r, pg, o, l);
586 }
587
588 IndexTuple
589 gistFormTuple(GISTSTATE *giststate, Relation r,
590                           Datum attdata[], bool isnull[], bool newValues)
591 {
592         GISTENTRY       centry[INDEX_MAX_KEYS];
593         Datum           compatt[INDEX_MAX_KEYS];
594         int                     i;
595         IndexTuple      res;
596
597         for (i = 0; i < r->rd_att->natts; i++)
598         {
599                 if (isnull[i])
600                         compatt[i] = (Datum) 0;
601                 else
602                 {
603                         gistcentryinit(giststate, i, &centry[i], attdata[i],
604                                                    r, NULL, (OffsetNumber) 0,
605                                                    newValues,
606                                                    FALSE);
607                         compatt[i] = centry[i].key;
608                 }
609         }
610
611         res = index_form_tuple(giststate->tupdesc, compatt, isnull);
612
613         /*
614          * The offset number on tuples on internal pages is unused. For historical
615          * reasons, it is set 0xffff.
616          */
617         ItemPointerSetOffsetNumber(&(res->t_tid), 0xffff);
618         return res;
619 }
620
621 float
622 gistpenalty(GISTSTATE *giststate, int attno,
623                         GISTENTRY *orig, bool isNullOrig,
624                         GISTENTRY *add, bool isNullAdd)
625 {
626         float           penalty = 0.0;
627
628         if (giststate->penaltyFn[attno].fn_strict == FALSE ||
629                 (isNullOrig == FALSE && isNullAdd == FALSE))
630         {
631                 FunctionCall3Coll(&giststate->penaltyFn[attno],
632                                                   giststate->supportCollation[attno],
633                                                   PointerGetDatum(orig),
634                                                   PointerGetDatum(add),
635                                                   PointerGetDatum(&penalty));
636                 /* disallow negative or NaN penalty */
637                 if (isnan(penalty) || penalty < 0.0)
638                         penalty = 0.0;
639         }
640         else if (isNullOrig && isNullAdd)
641                 penalty = 0.0;
642         else
643         {
644                 /* try to prevent mixing null and non-null values */
645                 penalty = get_float4_infinity();
646         }
647
648         return penalty;
649 }
650
651 /*
652  * Initialize a new index page
653  */
654 void
655 GISTInitBuffer(Buffer b, uint32 f)
656 {
657         GISTPageOpaque opaque;
658         Page            page;
659         Size            pageSize;
660
661         pageSize = BufferGetPageSize(b);
662         page = BufferGetPage(b);
663         PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
664
665         opaque = GistPageGetOpaque(page);
666         /* page was already zeroed by PageInit, so this is not needed: */
667         /* memset(&(opaque->nsn), 0, sizeof(GistNSN)); */
668         opaque->rightlink = InvalidBlockNumber;
669         opaque->flags = f;
670         opaque->gist_page_id = GIST_PAGE_ID;
671 }
672
673 /*
674  * Verify that a freshly-read page looks sane.
675  */
676 void
677 gistcheckpage(Relation rel, Buffer buf)
678 {
679         Page            page = BufferGetPage(buf);
680
681         /*
682          * ReadBuffer verifies that every newly-read page passes
683          * PageHeaderIsValid, which means it either contains a reasonably sane
684          * page header or is all-zero.  We have to defend against the all-zero
685          * case, however.
686          */
687         if (PageIsNew(page))
688                 ereport(ERROR,
689                                 (errcode(ERRCODE_INDEX_CORRUPTED),
690                          errmsg("index \"%s\" contains unexpected zero page at block %u",
691                                         RelationGetRelationName(rel),
692                                         BufferGetBlockNumber(buf)),
693                                  errhint("Please REINDEX it.")));
694
695         /*
696          * Additionally check that the special area looks sane.
697          */
698         if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
699                 ereport(ERROR,
700                                 (errcode(ERRCODE_INDEX_CORRUPTED),
701                                  errmsg("index \"%s\" contains corrupted page at block %u",
702                                                 RelationGetRelationName(rel),
703                                                 BufferGetBlockNumber(buf)),
704                                  errhint("Please REINDEX it.")));
705 }
706
707
708 /*
709  * Allocate a new page (either by recycling, or by extending the index file)
710  *
711  * The returned buffer is already pinned and exclusive-locked
712  *
713  * Caller is responsible for initializing the page by calling GISTInitBuffer
714  */
715 Buffer
716 gistNewBuffer(Relation r)
717 {
718         Buffer          buffer;
719         bool            needLock;
720
721         /* First, try to get a page from FSM */
722         for (;;)
723         {
724                 BlockNumber blkno = GetFreeIndexPage(r);
725
726                 if (blkno == InvalidBlockNumber)
727                         break;                          /* nothing left in FSM */
728
729                 buffer = ReadBuffer(r, blkno);
730
731                 /*
732                  * We have to guard against the possibility that someone else already
733                  * recycled this page; the buffer may be locked if so.
734                  */
735                 if (ConditionalLockBuffer(buffer))
736                 {
737                         Page            page = BufferGetPage(buffer);
738
739                         if (PageIsNew(page))
740                                 return buffer;  /* OK to use, if never initialized */
741
742                         gistcheckpage(r, buffer);
743
744                         if (GistPageIsDeleted(page))
745                                 return buffer;  /* OK to use */
746
747                         LockBuffer(buffer, GIST_UNLOCK);
748                 }
749
750                 /* Can't use it, so release buffer and try again */
751                 ReleaseBuffer(buffer);
752         }
753
754         /* Must extend the file */
755         needLock = !RELATION_IS_LOCAL(r);
756
757         if (needLock)
758                 LockRelationForExtension(r, ExclusiveLock);
759
760         buffer = ReadBuffer(r, P_NEW);
761         LockBuffer(buffer, GIST_EXCLUSIVE);
762
763         if (needLock)
764                 UnlockRelationForExtension(r, ExclusiveLock);
765
766         return buffer;
767 }
768
769 Datum
770 gistoptions(PG_FUNCTION_ARGS)
771 {
772         Datum           reloptions = PG_GETARG_DATUM(0);
773         bool            validate = PG_GETARG_BOOL(1);
774         relopt_value *options;
775         GiSTOptions *rdopts;
776         int                     numoptions;
777         static const relopt_parse_elt tab[] = {
778                 {"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)},
779                 {"buffering", RELOPT_TYPE_STRING, offsetof(GiSTOptions, bufferingModeOffset)}
780         };
781
782         options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIST,
783                                                           &numoptions);
784
785         /* if none set, we're done */
786         if (numoptions == 0)
787                 PG_RETURN_NULL();
788
789         rdopts = allocateReloptStruct(sizeof(GiSTOptions), options, numoptions);
790
791         fillRelOptions((void *) rdopts, sizeof(GiSTOptions), options, numoptions,
792                                    validate, tab, lengthof(tab));
793
794         pfree(options);
795
796         PG_RETURN_BYTEA_P(rdopts);
797
798 }
799
800 /*
801  * Temporary GiST indexes are not WAL-logged, but we need LSNs to detect
802  * concurrent page splits anyway. GetXLogRecPtrForTemp() provides a fake
803  * sequence of LSNs for that purpose. Each call generates an LSN that is
804  * greater than any previous value returned by this function in the same
805  * session.
806  */
807 XLogRecPtr
808 GetXLogRecPtrForTemp(void)
809 {
810         static XLogRecPtr counter = 1;
811         counter++;
812         return counter;
813 }