]> granicus.if.org Git - postgresql/blob - src/backend/access/gist/gistutil.c
97260201dcd588f84678139030829f399086e1e4
[postgresql] / src / backend / access / gist / gistutil.c
1 /*-------------------------------------------------------------------------
2  *
3  * gistutil.c
4  *        utilities routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/gist/gistutil.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include <math.h>
17
18 #include "access/gist_private.h"
19 #include "access/htup_details.h"
20 #include "access/reloptions.h"
21 #include "catalog/pg_opclass.h"
22 #include "storage/indexfsm.h"
23 #include "storage/lmgr.h"
24 #include "utils/float.h"
25 #include "utils/syscache.h"
26 #include "utils/snapmgr.h"
27 #include "utils/lsyscache.h"
28
29
30 /*
31  * Write itup vector to page, has no control of free space.
32  */
33 void
34 gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
35 {
36         OffsetNumber l = InvalidOffsetNumber;
37         int                     i;
38
39         if (off == InvalidOffsetNumber)
40                 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
41                         OffsetNumberNext(PageGetMaxOffsetNumber(page));
42
43         for (i = 0; i < len; i++)
44         {
45                 Size            sz = IndexTupleSize(itup[i]);
46
47                 l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
48                 if (l == InvalidOffsetNumber)
49                         elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
50                                  i, len, (int) sz);
51                 off++;
52         }
53 }
54
55 /*
56  * Check space for itup vector on page
57  */
58 bool
59 gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
60 {
61         unsigned int size = freespace,
62                                 deleted = 0;
63         int                     i;
64
65         for (i = 0; i < len; i++)
66                 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
67
68         if (todelete != InvalidOffsetNumber)
69         {
70                 IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
71
72                 deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
73         }
74
75         return (PageGetFreeSpace(page) + deleted < size);
76 }
77
78 bool
79 gistfitpage(IndexTuple *itvec, int len)
80 {
81         int                     i;
82         Size            size = 0;
83
84         for (i = 0; i < len; i++)
85                 size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
86
87         /* TODO: Consider fillfactor */
88         return (size <= GiSTPageSize);
89 }
90
91 /*
92  * Read buffer into itup vector
93  */
94 IndexTuple *
95 gistextractpage(Page page, int *len /* out */ )
96 {
97         OffsetNumber i,
98                                 maxoff;
99         IndexTuple *itvec;
100
101         maxoff = PageGetMaxOffsetNumber(page);
102         *len = maxoff;
103         itvec = palloc(sizeof(IndexTuple) * maxoff);
104         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
105                 itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
106
107         return itvec;
108 }
109
110 /*
111  * join two vectors into one
112  */
113 IndexTuple *
114 gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
115 {
116         itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
117         memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
118         *len += addlen;
119         return itvec;
120 }
121
122 /*
123  * make plain IndexTuple vector
124  */
125
126 IndexTupleData *
127 gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
128 {
129         char       *ptr,
130                            *ret;
131         int                     i;
132
133         *memlen = 0;
134
135         for (i = 0; i < veclen; i++)
136                 *memlen += IndexTupleSize(vec[i]);
137
138         ptr = ret = palloc(*memlen);
139
140         for (i = 0; i < veclen; i++)
141         {
142                 memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
143                 ptr += IndexTupleSize(vec[i]);
144         }
145
146         return (IndexTupleData *) ret;
147 }
148
149 /*
150  * Make unions of keys in IndexTuple vector (one union datum per index column).
151  * Union Datums are returned into the attr/isnull arrays.
152  * Resulting Datums aren't compressed.
153  */
154 void
155 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len,
156                                    Datum *attr, bool *isnull)
157 {
158         int                     i;
159         GistEntryVector *evec;
160         int                     attrsize;
161
162         evec = (GistEntryVector *) palloc((len + 2) * sizeof(GISTENTRY) + GEVHDRSZ);
163
164         for (i = 0; i < giststate->nonLeafTupdesc->natts; i++)
165         {
166                 int                     j;
167
168                 /* Collect non-null datums for this column */
169                 evec->n = 0;
170                 for (j = 0; j < len; j++)
171                 {
172                         Datum           datum;
173                         bool            IsNull;
174
175                         datum = index_getattr(itvec[j], i + 1, giststate->leafTupdesc,
176                                                                   &IsNull);
177                         if (IsNull)
178                                 continue;
179
180                         gistdentryinit(giststate, i,
181                                                    evec->vector + evec->n,
182                                                    datum,
183                                                    NULL, NULL, (OffsetNumber) 0,
184                                                    false, IsNull);
185                         evec->n++;
186                 }
187
188                 /* If this column was all NULLs, the union is NULL */
189                 if (evec->n == 0)
190                 {
191                         attr[i] = (Datum) 0;
192                         isnull[i] = true;
193                 }
194                 else
195                 {
196                         if (evec->n == 1)
197                         {
198                                 /* unionFn may expect at least two inputs */
199                                 evec->n = 2;
200                                 evec->vector[1] = evec->vector[0];
201                         }
202
203                         /* Make union and store in attr array */
204                         attr[i] = FunctionCall2Coll(&giststate->unionFn[i],
205                                                                                 giststate->supportCollation[i],
206                                                                                 PointerGetDatum(evec),
207                                                                                 PointerGetDatum(&attrsize));
208
209                         isnull[i] = false;
210                 }
211         }
212 }
213
214 /*
215  * Return an IndexTuple containing the result of applying the "union"
216  * method to the specified IndexTuple vector.
217  */
218 IndexTuple
219 gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
220 {
221         Datum           attr[INDEX_MAX_KEYS];
222         bool            isnull[INDEX_MAX_KEYS];
223
224         gistMakeUnionItVec(giststate, itvec, len, attr, isnull);
225
226         return gistFormTuple(giststate, r, attr, isnull, false);
227 }
228
229 /*
230  * makes union of two key
231  */
232 void
233 gistMakeUnionKey(GISTSTATE *giststate, int attno,
234                                  GISTENTRY *entry1, bool isnull1,
235                                  GISTENTRY *entry2, bool isnull2,
236                                  Datum *dst, bool *dstisnull)
237 {
238         /* we need a GistEntryVector with room for exactly 2 elements */
239         union
240         {
241                 GistEntryVector gev;
242                 char            padding[2 * sizeof(GISTENTRY) + GEVHDRSZ];
243         }                       storage;
244         GistEntryVector *evec = &storage.gev;
245         int                     dstsize;
246
247         evec->n = 2;
248
249         if (isnull1 && isnull2)
250         {
251                 *dstisnull = true;
252                 *dst = (Datum) 0;
253         }
254         else
255         {
256                 if (isnull1 == false && isnull2 == false)
257                 {
258                         evec->vector[0] = *entry1;
259                         evec->vector[1] = *entry2;
260                 }
261                 else if (isnull1 == false)
262                 {
263                         evec->vector[0] = *entry1;
264                         evec->vector[1] = *entry1;
265                 }
266                 else
267                 {
268                         evec->vector[0] = *entry2;
269                         evec->vector[1] = *entry2;
270                 }
271
272                 *dstisnull = false;
273                 *dst = FunctionCall2Coll(&giststate->unionFn[attno],
274                                                                  giststate->supportCollation[attno],
275                                                                  PointerGetDatum(evec),
276                                                                  PointerGetDatum(&dstsize));
277         }
278 }
279
280 bool
281 gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b)
282 {
283         bool            result;
284
285         FunctionCall3Coll(&giststate->equalFn[attno],
286                                           giststate->supportCollation[attno],
287                                           a, b,
288                                           PointerGetDatum(&result));
289         return result;
290 }
291
292 /*
293  * Decompress all keys in tuple
294  */
295 void
296 gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
297                                   OffsetNumber o, GISTENTRY *attdata, bool *isnull)
298 {
299         int                     i;
300
301         for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
302         {
303                 Datum           datum;
304
305                 datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]);
306                 gistdentryinit(giststate, i, &attdata[i],
307                                            datum, r, p, o,
308                                            false, isnull[i]);
309         }
310 }
311
312 /*
313  * Forms union of oldtup and addtup, if union == oldtup then return NULL
314  */
315 IndexTuple
316 gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
317 {
318         bool            neednew = false;
319         GISTENTRY       oldentries[INDEX_MAX_KEYS],
320                                 addentries[INDEX_MAX_KEYS];
321         bool            oldisnull[INDEX_MAX_KEYS],
322                                 addisnull[INDEX_MAX_KEYS];
323         Datum           attr[INDEX_MAX_KEYS];
324         bool            isnull[INDEX_MAX_KEYS];
325         IndexTuple      newtup = NULL;
326         int                     i;
327
328         gistDeCompressAtt(giststate, r, oldtup, NULL,
329                                           (OffsetNumber) 0, oldentries, oldisnull);
330
331         gistDeCompressAtt(giststate, r, addtup, NULL,
332                                           (OffsetNumber) 0, addentries, addisnull);
333
334         for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
335         {
336                 gistMakeUnionKey(giststate, i,
337                                                  oldentries + i, oldisnull[i],
338                                                  addentries + i, addisnull[i],
339                                                  attr + i, isnull + i);
340
341                 if (neednew)
342                         /* we already need new key, so we can skip check */
343                         continue;
344
345                 if (isnull[i])
346                         /* union of key may be NULL if and only if both keys are NULL */
347                         continue;
348
349                 if (!addisnull[i])
350                 {
351                         if (oldisnull[i] ||
352                                 !gistKeyIsEQ(giststate, i, oldentries[i].key, attr[i]))
353                                 neednew = true;
354                 }
355         }
356
357         if (neednew)
358         {
359                 /* need to update key */
360                 newtup = gistFormTuple(giststate, r, attr, isnull, false);
361                 newtup->t_tid = oldtup->t_tid;
362         }
363
364         return newtup;
365 }
366
367 /*
368  * Search an upper index page for the entry with lowest penalty for insertion
369  * of the new index key contained in "it".
370  *
371  * Returns the index of the page entry to insert into.
372  */
373 OffsetNumber
374 gistchoose(Relation r, Page p, IndexTuple it,   /* it has compressed entry */
375                    GISTSTATE *giststate)
376 {
377         OffsetNumber result;
378         OffsetNumber maxoff;
379         OffsetNumber i;
380         float           best_penalty[INDEX_MAX_KEYS];
381         GISTENTRY       entry,
382                                 identry[INDEX_MAX_KEYS];
383         bool            isnull[INDEX_MAX_KEYS];
384         int                     keep_current_best;
385
386         Assert(!GistPageIsLeaf(p));
387
388         gistDeCompressAtt(giststate, r,
389                                           it, NULL, (OffsetNumber) 0,
390                                           identry, isnull);
391
392         /* we'll return FirstOffsetNumber if page is empty (shouldn't happen) */
393         result = FirstOffsetNumber;
394
395         /*
396          * The index may have multiple columns, and there's a penalty value for
397          * each column.  The penalty associated with a column that appears earlier
398          * in the index definition is strictly more important than the penalty of
399          * a column that appears later in the index definition.
400          *
401          * best_penalty[j] is the best penalty we have seen so far for column j,
402          * or -1 when we haven't yet examined column j.  Array entries to the
403          * right of the first -1 are undefined.
404          */
405         best_penalty[0] = -1;
406
407         /*
408          * If we find a tuple that's exactly as good as the currently best one, we
409          * could use either one.  When inserting a lot of tuples with the same or
410          * similar keys, it's preferable to descend down the same path when
411          * possible, as that's more cache-friendly.  On the other hand, if all
412          * inserts land on the same leaf page after a split, we're never going to
413          * insert anything to the other half of the split, and will end up using
414          * only 50% of the available space.  Distributing the inserts evenly would
415          * lead to better space usage, but that hurts cache-locality during
416          * insertion.  To get the best of both worlds, when we find a tuple that's
417          * exactly as good as the previous best, choose randomly whether to stick
418          * to the old best, or use the new one.  Once we decide to stick to the
419          * old best, we keep sticking to it for any subsequent equally good tuples
420          * we might find.  This favors tuples with low offsets, but still allows
421          * some inserts to go to other equally-good subtrees.
422          *
423          * keep_current_best is -1 if we haven't yet had to make a random choice
424          * whether to keep the current best tuple.  If we have done so, and
425          * decided to keep it, keep_current_best is 1; if we've decided to
426          * replace, keep_current_best is 0.  (This state will be reset to -1 as
427          * soon as we've made the replacement, but sometimes we make the choice in
428          * advance of actually finding a replacement best tuple.)
429          */
430         keep_current_best = -1;
431
432         /*
433          * Loop over tuples on page.
434          */
435         maxoff = PageGetMaxOffsetNumber(p);
436         Assert(maxoff >= FirstOffsetNumber);
437
438         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
439         {
440                 IndexTuple      itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
441                 bool            zero_penalty;
442                 int                     j;
443
444                 zero_penalty = true;
445
446                 /* Loop over index attributes. */
447                 for (j = 0; j < IndexRelationGetNumberOfKeyAttributes(r); j++)
448                 {
449                         Datum           datum;
450                         float           usize;
451                         bool            IsNull;
452
453                         /* Compute penalty for this column. */
454                         datum = index_getattr(itup, j + 1, giststate->leafTupdesc,
455                                                                   &IsNull);
456                         gistdentryinit(giststate, j, &entry, datum, r, p, i,
457                                                    false, IsNull);
458                         usize = gistpenalty(giststate, j, &entry, IsNull,
459                                                                 &identry[j], isnull[j]);
460                         if (usize > 0)
461                                 zero_penalty = false;
462
463                         if (best_penalty[j] < 0 || usize < best_penalty[j])
464                         {
465                                 /*
466                                  * New best penalty for column.  Tentatively select this tuple
467                                  * as the target, and record the best penalty.  Then reset the
468                                  * next column's penalty to "unknown" (and indirectly, the
469                                  * same for all the ones to its right).  This will force us to
470                                  * adopt this tuple's penalty values as the best for all the
471                                  * remaining columns during subsequent loop iterations.
472                                  */
473                                 result = i;
474                                 best_penalty[j] = usize;
475
476                                 if (j < IndexRelationGetNumberOfKeyAttributes(r) - 1)
477                                         best_penalty[j + 1] = -1;
478
479                                 /* we have new best, so reset keep-it decision */
480                                 keep_current_best = -1;
481                         }
482                         else if (best_penalty[j] == usize)
483                         {
484                                 /*
485                                  * The current tuple is exactly as good for this column as the
486                                  * best tuple seen so far.  The next iteration of this loop
487                                  * will compare the next column.
488                                  */
489                         }
490                         else
491                         {
492                                 /*
493                                  * The current tuple is worse for this column than the best
494                                  * tuple seen so far.  Skip the remaining columns and move on
495                                  * to the next tuple, if any.
496                                  */
497                                 zero_penalty = false;   /* so outer loop won't exit */
498                                 break;
499                         }
500                 }
501
502                 /*
503                  * If we looped past the last column, and did not update "result",
504                  * then this tuple is exactly as good as the prior best tuple.
505                  */
506                 if (j == IndexRelationGetNumberOfKeyAttributes(r) && result != i)
507                 {
508                         if (keep_current_best == -1)
509                         {
510                                 /* we didn't make the random choice yet for this old best */
511                                 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
512                         }
513                         if (keep_current_best == 0)
514                         {
515                                 /* we choose to use the new tuple */
516                                 result = i;
517                                 /* choose again if there are even more exactly-as-good ones */
518                                 keep_current_best = -1;
519                         }
520                 }
521
522                 /*
523                  * If we find a tuple with zero penalty for all columns, and we've
524                  * decided we don't want to search for another tuple with equal
525                  * penalty, there's no need to examine remaining tuples; just break
526                  * out of the loop and return it.
527                  */
528                 if (zero_penalty)
529                 {
530                         if (keep_current_best == -1)
531                         {
532                                 /* we didn't make the random choice yet for this old best */
533                                 keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
534                         }
535                         if (keep_current_best == 1)
536                                 break;
537                 }
538         }
539
540         return result;
541 }
542
543 /*
544  * initialize a GiST entry with a decompressed version of key
545  */
546 void
547 gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
548                            Datum k, Relation r, Page pg, OffsetNumber o,
549                            bool l, bool isNull)
550 {
551         if (!isNull)
552         {
553                 GISTENTRY  *dep;
554
555                 gistentryinit(*e, k, r, pg, o, l);
556
557                 /* there may not be a decompress function in opclass */
558                 if (!OidIsValid(giststate->decompressFn[nkey].fn_oid))
559                         return;
560
561                 dep = (GISTENTRY *)
562                         DatumGetPointer(FunctionCall1Coll(&giststate->decompressFn[nkey],
563                                                                                           giststate->supportCollation[nkey],
564                                                                                           PointerGetDatum(e)));
565                 /* decompressFn may just return the given pointer */
566                 if (dep != e)
567                         gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
568                                                   dep->leafkey);
569         }
570         else
571                 gistentryinit(*e, (Datum) 0, r, pg, o, l);
572 }
573
574 IndexTuple
575 gistFormTuple(GISTSTATE *giststate, Relation r,
576                           Datum attdata[], bool isnull[], bool isleaf)
577 {
578         Datum           compatt[INDEX_MAX_KEYS];
579         int                     i;
580         IndexTuple      res;
581
582         /*
583          * Call the compress method on each attribute.
584          */
585         for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
586         {
587                 if (isnull[i])
588                         compatt[i] = (Datum) 0;
589                 else
590                 {
591                         GISTENTRY       centry;
592                         GISTENTRY  *cep;
593
594                         gistentryinit(centry, attdata[i], r, NULL, (OffsetNumber) 0,
595                                                   isleaf);
596                         /* there may not be a compress function in opclass */
597                         if (OidIsValid(giststate->compressFn[i].fn_oid))
598                                 cep = (GISTENTRY *)
599                                         DatumGetPointer(FunctionCall1Coll(&giststate->compressFn[i],
600                                                                                                           giststate->supportCollation[i],
601                                                                                                           PointerGetDatum(&centry)));
602                         else
603                                 cep = &centry;
604                         compatt[i] = cep->key;
605                 }
606         }
607
608         if (isleaf)
609         {
610                 /*
611                  * Emplace each included attribute if any.
612                  */
613                 for (; i < r->rd_att->natts; i++)
614                 {
615                         if (isnull[i])
616                                 compatt[i] = (Datum) 0;
617                         else
618                                 compatt[i] = attdata[i];
619                 }
620         }
621
622         res = index_form_tuple(isleaf ? giststate->leafTupdesc :
623                                                    giststate->nonLeafTupdesc,
624                                                    compatt, isnull);
625
626         /*
627          * The offset number on tuples on internal pages is unused. For historical
628          * reasons, it is set to 0xffff.
629          */
630         ItemPointerSetOffsetNumber(&(res->t_tid), 0xffff);
631         return res;
632 }
633
634 /*
635  * initialize a GiST entry with fetched value in key field
636  */
637 static Datum
638 gistFetchAtt(GISTSTATE *giststate, int nkey, Datum k, Relation r)
639 {
640         GISTENTRY       fentry;
641         GISTENTRY  *fep;
642
643         gistentryinit(fentry, k, r, NULL, (OffsetNumber) 0, false);
644
645         fep = (GISTENTRY *)
646                 DatumGetPointer(FunctionCall1Coll(&giststate->fetchFn[nkey],
647                                                                                   giststate->supportCollation[nkey],
648                                                                                   PointerGetDatum(&fentry)));
649
650         /* fetchFn set 'key', return it to the caller */
651         return fep->key;
652 }
653
654 /*
655  * Fetch all keys in tuple.
656  * Returns a new HeapTuple containing the originally-indexed data.
657  */
658 HeapTuple
659 gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
660 {
661         MemoryContext oldcxt = MemoryContextSwitchTo(giststate->tempCxt);
662         Datum           fetchatt[INDEX_MAX_KEYS];
663         bool            isnull[INDEX_MAX_KEYS];
664         int                     i;
665
666         for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
667         {
668                 Datum           datum;
669
670                 datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]);
671
672                 if (giststate->fetchFn[i].fn_oid != InvalidOid)
673                 {
674                         if (!isnull[i])
675                                 fetchatt[i] = gistFetchAtt(giststate, i, datum, r);
676                         else
677                                 fetchatt[i] = (Datum) 0;
678                 }
679                 else if (giststate->compressFn[i].fn_oid == InvalidOid)
680                 {
681                         /*
682                          * If opclass does not provide compress method that could change
683                          * original value, att is necessarily stored in original form.
684                          */
685                         if (!isnull[i])
686                                 fetchatt[i] = datum;
687                         else
688                                 fetchatt[i] = (Datum) 0;
689                 }
690                 else
691                 {
692                         /*
693                          * Index-only scans not supported for this column. Since the
694                          * planner chose an index-only scan anyway, it is not interested
695                          * in this column, and we can replace it with a NULL.
696                          */
697                         isnull[i] = true;
698                         fetchatt[i] = (Datum) 0;
699                 }
700         }
701
702         /*
703          * Get each included attribute.
704          */
705         for (; i < r->rd_att->natts; i++)
706         {
707                 fetchatt[i] = index_getattr(tuple, i + 1, giststate->leafTupdesc,
708                                                                         &isnull[i]);
709         }
710         MemoryContextSwitchTo(oldcxt);
711
712         return heap_form_tuple(giststate->fetchTupdesc, fetchatt, isnull);
713 }
714
715 float
716 gistpenalty(GISTSTATE *giststate, int attno,
717                         GISTENTRY *orig, bool isNullOrig,
718                         GISTENTRY *add, bool isNullAdd)
719 {
720         float           penalty = 0.0;
721
722         if (giststate->penaltyFn[attno].fn_strict == false ||
723                 (isNullOrig == false && isNullAdd == false))
724         {
725                 FunctionCall3Coll(&giststate->penaltyFn[attno],
726                                                   giststate->supportCollation[attno],
727                                                   PointerGetDatum(orig),
728                                                   PointerGetDatum(add),
729                                                   PointerGetDatum(&penalty));
730                 /* disallow negative or NaN penalty */
731                 if (isnan(penalty) || penalty < 0.0)
732                         penalty = 0.0;
733         }
734         else if (isNullOrig && isNullAdd)
735                 penalty = 0.0;
736         else
737         {
738                 /* try to prevent mixing null and non-null values */
739                 penalty = get_float4_infinity();
740         }
741
742         return penalty;
743 }
744
745 /*
746  * Initialize a new index page
747  */
748 void
749 GISTInitBuffer(Buffer b, uint32 f)
750 {
751         GISTPageOpaque opaque;
752         Page            page;
753         Size            pageSize;
754
755         pageSize = BufferGetPageSize(b);
756         page = BufferGetPage(b);
757         PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
758
759         opaque = GistPageGetOpaque(page);
760         /* page was already zeroed by PageInit, so this is not needed: */
761         /* memset(&(opaque->nsn), 0, sizeof(GistNSN)); */
762         opaque->rightlink = InvalidBlockNumber;
763         opaque->flags = f;
764         opaque->gist_page_id = GIST_PAGE_ID;
765 }
766
767 /*
768  * Verify that a freshly-read page looks sane.
769  */
770 void
771 gistcheckpage(Relation rel, Buffer buf)
772 {
773         Page            page = BufferGetPage(buf);
774
775         /*
776          * ReadBuffer verifies that every newly-read page passes
777          * PageHeaderIsValid, which means it either contains a reasonably sane
778          * page header or is all-zero.  We have to defend against the all-zero
779          * case, however.
780          */
781         if (PageIsNew(page))
782                 ereport(ERROR,
783                                 (errcode(ERRCODE_INDEX_CORRUPTED),
784                                  errmsg("index \"%s\" contains unexpected zero page at block %u",
785                                                 RelationGetRelationName(rel),
786                                                 BufferGetBlockNumber(buf)),
787                                  errhint("Please REINDEX it.")));
788
789         /*
790          * Additionally check that the special area looks sane.
791          */
792         if (PageGetSpecialSize(page) != MAXALIGN(sizeof(GISTPageOpaqueData)))
793                 ereport(ERROR,
794                                 (errcode(ERRCODE_INDEX_CORRUPTED),
795                                  errmsg("index \"%s\" contains corrupted page at block %u",
796                                                 RelationGetRelationName(rel),
797                                                 BufferGetBlockNumber(buf)),
798                                  errhint("Please REINDEX it.")));
799 }
800
801
802 /*
803  * Allocate a new page (either by recycling, or by extending the index file)
804  *
805  * The returned buffer is already pinned and exclusive-locked
806  *
807  * Caller is responsible for initializing the page by calling GISTInitBuffer
808  */
809 Buffer
810 gistNewBuffer(Relation r)
811 {
812         Buffer          buffer;
813         bool            needLock;
814
815         /* First, try to get a page from FSM */
816         for (;;)
817         {
818                 BlockNumber blkno = GetFreeIndexPage(r);
819
820                 if (blkno == InvalidBlockNumber)
821                         break;                          /* nothing left in FSM */
822
823                 buffer = ReadBuffer(r, blkno);
824
825                 /*
826                  * We have to guard against the possibility that someone else already
827                  * recycled this page; the buffer may be locked if so.
828                  */
829                 if (ConditionalLockBuffer(buffer))
830                 {
831                         Page            page = BufferGetPage(buffer);
832
833                         /*
834                          * If the page was never initialized, it's OK to use.
835                          */
836                         if (PageIsNew(page))
837                                 return buffer;
838
839                         gistcheckpage(r, buffer);
840
841                         /*
842                          * Otherwise, recycle it if deleted, and too old to have any
843                          * processes interested in it.
844                          */
845                         if (gistPageRecyclable(page))
846                         {
847                                 /*
848                                  * If we are generating WAL for Hot Standby then create a WAL
849                                  * record that will allow us to conflict with queries running
850                                  * on standby, in case they have snapshots older than the
851                                  * page's deleteXid.
852                                  */
853                                 if (XLogStandbyInfoActive() && RelationNeedsWAL(r))
854                                         gistXLogPageReuse(r, blkno, GistPageGetDeleteXid(page));
855
856                                 return buffer;
857                         }
858
859                         LockBuffer(buffer, GIST_UNLOCK);
860                 }
861
862                 /* Can't use it, so release buffer and try again */
863                 ReleaseBuffer(buffer);
864         }
865
866         /* Must extend the file */
867         needLock = !RELATION_IS_LOCAL(r);
868
869         if (needLock)
870                 LockRelationForExtension(r, ExclusiveLock);
871
872         buffer = ReadBuffer(r, P_NEW);
873         LockBuffer(buffer, GIST_EXCLUSIVE);
874
875         if (needLock)
876                 UnlockRelationForExtension(r, ExclusiveLock);
877
878         return buffer;
879 }
880
881 /* Can this page be recycled yet? */
882 bool
883 gistPageRecyclable(Page page)
884 {
885         if (PageIsNew(page))
886                 return true;
887         if (GistPageIsDeleted(page))
888         {
889                 /*
890                  * The page was deleted, but when? If it was just deleted, a scan
891                  * might have seen the downlink to it, and will read the page later.
892                  * As long as that can happen, we must keep the deleted page around as
893                  * a tombstone.
894                  *
895                  * Compare the deletion XID with RecentGlobalXmin. If deleteXid <
896                  * RecentGlobalXmin, then no scan that's still in progress could have
897                  * seen its downlink, and we can recycle it.
898                  */
899                 FullTransactionId deletexid_full = GistPageGetDeleteXid(page);
900                 FullTransactionId recentxmin_full = GetFullRecentGlobalXmin();
901
902                 if (FullTransactionIdPrecedes(deletexid_full, recentxmin_full))
903                         return true;
904         }
905         return false;
906 }
907
908 bytea *
909 gistoptions(Datum reloptions, bool validate)
910 {
911         relopt_value *options;
912         GiSTOptions *rdopts;
913         int                     numoptions;
914         static const relopt_parse_elt tab[] = {
915                 {"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)},
916                 {"buffering", RELOPT_TYPE_STRING, offsetof(GiSTOptions, bufferingModeOffset)}
917         };
918
919         options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIST,
920                                                           &numoptions);
921
922         /* if none set, we're done */
923         if (numoptions == 0)
924                 return NULL;
925
926         rdopts = allocateReloptStruct(sizeof(GiSTOptions), options, numoptions);
927
928         fillRelOptions((void *) rdopts, sizeof(GiSTOptions), options, numoptions,
929                                    validate, tab, lengthof(tab));
930
931         pfree(options);
932
933         return (bytea *) rdopts;
934 }
935
936 /*
937  *      gistproperty() -- Check boolean properties of indexes.
938  *
939  * This is optional for most AMs, but is required for GiST because the core
940  * property code doesn't support AMPROP_DISTANCE_ORDERABLE.  We also handle
941  * AMPROP_RETURNABLE here to save opening the rel to call gistcanreturn.
942  */
943 bool
944 gistproperty(Oid index_oid, int attno,
945                          IndexAMProperty prop, const char *propname,
946                          bool *res, bool *isnull)
947 {
948         Oid                     opclass,
949                                 opfamily,
950                                 opcintype;
951         int16           procno;
952
953         /* Only answer column-level inquiries */
954         if (attno == 0)
955                 return false;
956
957         /*
958          * Currently, GiST distance-ordered scans require that there be a distance
959          * function in the opclass with the default types (i.e. the one loaded
960          * into the relcache entry, see initGISTstate).  So we assume that if such
961          * a function exists, then there's a reason for it (rather than grubbing
962          * through all the opfamily's operators to find an ordered one).
963          *
964          * Essentially the same code can test whether we support returning the
965          * column data, since that's true if the opclass provides a fetch proc.
966          */
967
968         switch (prop)
969         {
970                 case AMPROP_DISTANCE_ORDERABLE:
971                         procno = GIST_DISTANCE_PROC;
972                         break;
973                 case AMPROP_RETURNABLE:
974                         procno = GIST_FETCH_PROC;
975                         break;
976                 default:
977                         return false;
978         }
979
980         /* First we need to know the column's opclass. */
981         opclass = get_index_column_opclass(index_oid, attno);
982         if (!OidIsValid(opclass))
983         {
984                 *isnull = true;
985                 return true;
986         }
987
988         /* Now look up the opclass family and input datatype. */
989         if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
990         {
991                 *isnull = true;
992                 return true;
993         }
994
995         /* And now we can check whether the function is provided. */
996
997         *res = SearchSysCacheExists4(AMPROCNUM,
998                                                                  ObjectIdGetDatum(opfamily),
999                                                                  ObjectIdGetDatum(opcintype),
1000                                                                  ObjectIdGetDatum(opcintype),
1001                                                                  Int16GetDatum(procno));
1002
1003         /*
1004          * Special case: even without a fetch function, AMPROP_RETURNABLE is true
1005          * if the opclass has no compress function.
1006          */
1007         if (prop == AMPROP_RETURNABLE && !*res)
1008         {
1009                 *res = !SearchSysCacheExists4(AMPROCNUM,
1010                                                                           ObjectIdGetDatum(opfamily),
1011                                                                           ObjectIdGetDatum(opcintype),
1012                                                                           ObjectIdGetDatum(opcintype),
1013                                                                           Int16GetDatum(GIST_COMPRESS_PROC));
1014         }
1015
1016         *isnull = false;
1017
1018         return true;
1019 }
1020
1021 /*
1022  * Temporary and unlogged GiST indexes are not WAL-logged, but we need LSNs
1023  * to detect concurrent page splits anyway. This function provides a fake
1024  * sequence of LSNs for that purpose.
1025  */
1026 XLogRecPtr
1027 gistGetFakeLSN(Relation rel)
1028 {
1029         static XLogRecPtr counter = FirstNormalUnloggedLSN;
1030
1031         if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
1032         {
1033                 /*
1034                  * Temporary relations are only accessible in our session, so a simple
1035                  * backend-local counter will do.
1036                  */
1037                 return counter++;
1038         }
1039         else
1040         {
1041                 /*
1042                  * Unlogged relations are accessible from other backends, and survive
1043                  * (clean) restarts. GetFakeLSNForUnloggedRel() handles that for us.
1044                  */
1045                 Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED);
1046                 return GetFakeLSNForUnloggedRel();
1047         }
1048 }