]> granicus.if.org Git - postgresql/blob - src/backend/access/gist/gist.c
597056ae440b05f0f68f44a5f98853c8009d4355
[postgresql] / src / backend / access / gist / gist.c
1 /*-------------------------------------------------------------------------
2  *
3  * gist.c
4  *        interface routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        src/backend/access/gist/gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/gist_private.h"
18 #include "access/gistscan.h"
19 #include "catalog/pg_collation.h"
20 #include "miscadmin.h"
21 #include "utils/builtins.h"
22 #include "utils/index_selfuncs.h"
23 #include "utils/memutils.h"
24 #include "utils/rel.h"
25
26
27 /* non-export function prototypes */
28 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
29 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
30                          GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
31 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
32                                  GISTSTATE *giststate,
33                                  IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
34                                  Buffer leftchild, Buffer rightchild,
35                                  bool unlockbuf, bool unlockleftchild);
36 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
37                                 GISTSTATE *giststate, List *splitinfo, bool releasebuf);
38 static void gistvacuumpage(Relation rel, Page page, Buffer buffer);
39
40
41 #define ROTATEDIST(d) do { \
42         SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
43         memset(tmp,0,sizeof(SplitedPageLayout)); \
44         tmp->block.blkno = InvalidBlockNumber;  \
45         tmp->buffer = InvalidBuffer;    \
46         tmp->next = (d); \
47         (d)=tmp; \
48 } while(0)
49
50
51 /*
52  * GiST handler function: return IndexAmRoutine with access method parameters
53  * and callbacks.
54  */
55 Datum
56 gisthandler(PG_FUNCTION_ARGS)
57 {
58         IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
59
60         amroutine->amstrategies = 0;
61         amroutine->amsupport = GISTNProcs;
62         amroutine->amcanorder = false;
63         amroutine->amcanorderbyop = true;
64         amroutine->amcanbackward = false;
65         amroutine->amcanunique = false;
66         amroutine->amcanmulticol = true;
67         amroutine->amoptionalkey = true;
68         amroutine->amsearcharray = false;
69         amroutine->amsearchnulls = true;
70         amroutine->amstorage = true;
71         amroutine->amclusterable = true;
72         amroutine->ampredlocks = false;
73         amroutine->amkeytype = InvalidOid;
74
75         amroutine->ambuild = gistbuild;
76         amroutine->ambuildempty = gistbuildempty;
77         amroutine->aminsert = gistinsert;
78         amroutine->ambulkdelete = gistbulkdelete;
79         amroutine->amvacuumcleanup = gistvacuumcleanup;
80         amroutine->amcanreturn = gistcanreturn;
81         amroutine->amcostestimate = gistcostestimate;
82         amroutine->amoptions = gistoptions;
83         amroutine->amproperty = gistproperty;
84         amroutine->amvalidate = gistvalidate;
85         amroutine->ambeginscan = gistbeginscan;
86         amroutine->amrescan = gistrescan;
87         amroutine->amgettuple = gistgettuple;
88         amroutine->amgetbitmap = gistgetbitmap;
89         amroutine->amendscan = gistendscan;
90         amroutine->ammarkpos = NULL;
91         amroutine->amrestrpos = NULL;
92
93         PG_RETURN_POINTER(amroutine);
94 }
95
96 /*
97  * Create and return a temporary memory context for use by GiST. We
98  * _always_ invoke user-provided methods in a temporary memory
99  * context, so that memory leaks in those functions cannot cause
100  * problems. Also, we use some additional temporary contexts in the
101  * GiST code itself, to avoid the need to do some awkward manual
102  * memory management.
103  */
104 MemoryContext
105 createTempGistContext(void)
106 {
107         return AllocSetContextCreate(CurrentMemoryContext,
108                                                                  "GiST temporary context",
109                                                                  ALLOCSET_DEFAULT_SIZES);
110 }
111
112 /*
113  *      gistbuildempty() -- build an empty gist index in the initialization fork
114  */
115 void
116 gistbuildempty(Relation index)
117 {
118         Buffer          buffer;
119
120         /* Initialize the root page */
121         buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
122         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
123
124         /* Initialize and xlog buffer */
125         START_CRIT_SECTION();
126         GISTInitBuffer(buffer, F_LEAF);
127         MarkBufferDirty(buffer);
128         log_newpage_buffer(buffer, true);
129         END_CRIT_SECTION();
130
131         /* Unlock and release the buffer */
132         UnlockReleaseBuffer(buffer);
133 }
134
135 /*
136  *      gistinsert -- wrapper for GiST tuple insertion.
137  *
138  *        This is the public interface routine for tuple insertion in GiSTs.
139  *        It doesn't do any work; just locks the relation and passes the buck.
140  */
141 bool
142 gistinsert(Relation r, Datum *values, bool *isnull,
143                    ItemPointer ht_ctid, Relation heapRel,
144                    IndexUniqueCheck checkUnique)
145 {
146         IndexTuple      itup;
147         GISTSTATE  *giststate;
148         MemoryContext oldCxt;
149
150         giststate = initGISTstate(r);
151
152         /*
153          * We use the giststate's scan context as temp context too.  This means
154          * that any memory leaked by the support functions is not reclaimed until
155          * end of insert.  In most cases, we aren't going to call the support
156          * functions very many times before finishing the insert, so this seems
157          * cheaper than resetting a temp context for each function call.
158          */
159         oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
160
161         itup = gistFormTuple(giststate, r,
162                                                  values, isnull, true /* size is currently bogus */ );
163         itup->t_tid = *ht_ctid;
164
165         gistdoinsert(r, itup, 0, giststate);
166
167         /* cleanup */
168         MemoryContextSwitchTo(oldCxt);
169         freeGISTstate(giststate);
170
171         return false;
172 }
173
174
175 /*
176  * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
177  * at that offset is atomically removed along with inserting the new tuples.
178  * This is used to replace a tuple with a new one.
179  *
180  * If 'leftchildbuf' is valid, we're inserting the downlink for the page
181  * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
182  * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
183  *
184  * If 'markfollowright' is true and the page is split, the left child is
185  * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
186  * index build, however, there is no concurrent access and the page splitting
187  * is done in a slightly simpler fashion, and false is passed.
188  *
189  * If there is not enough room on the page, it is split. All the split
190  * pages are kept pinned and locked and returned in *splitinfo, the caller
191  * is responsible for inserting the downlinks for them. However, if
192  * 'buffer' is the root page and it needs to be split, gistplacetopage()
193  * performs the split as one atomic operation, and *splitinfo is set to NIL.
194  * In that case, we continue to hold the root page locked, and the child
195  * pages are released; note that new tuple(s) are *not* on the root page
196  * but in one of the new child pages.
197  *
198  * If 'newblkno' is not NULL, returns the block number of page the first
199  * new/updated tuple was inserted to. Usually it's the given page, but could
200  * be its right sibling if the page was split.
201  *
202  * Returns 'true' if the page was split, 'false' otherwise.
203  */
204 bool
205 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
206                                 Buffer buffer,
207                                 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
208                                 BlockNumber *newblkno,
209                                 Buffer leftchildbuf,
210                                 List **splitinfo,
211                                 bool markfollowright)
212 {
213         BlockNumber blkno = BufferGetBlockNumber(buffer);
214         Page            page = BufferGetPage(buffer);
215         bool            is_leaf = (GistPageIsLeaf(page)) ? true : false;
216         XLogRecPtr      recptr;
217         int                     i;
218         bool            is_split;
219
220         /*
221          * Refuse to modify a page that's incompletely split. This should not
222          * happen because we finish any incomplete splits while we walk down the
223          * tree. However, it's remotely possible that another concurrent inserter
224          * splits a parent page, and errors out before completing the split. We
225          * will just throw an error in that case, and leave any split we had in
226          * progress unfinished too. The next insert that comes along will clean up
227          * the mess.
228          */
229         if (GistFollowRight(page))
230                 elog(ERROR, "concurrent GiST page split was incomplete");
231
232         *splitinfo = NIL;
233
234         /*
235          * if isupdate, remove old key: This node's key has been modified, either
236          * because a child split occurred or because we needed to adjust our key
237          * for an insert in a child node. Therefore, remove the old version of
238          * this node's key.
239          *
240          * for WAL replay, in the non-split case we handle this by setting up a
241          * one-element todelete array; in the split case, it's handled implicitly
242          * because the tuple vector passed to gistSplit won't include this tuple.
243          */
244         is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
245
246         /*
247          * If leaf page is full, try at first to delete dead tuples. And then
248          * check again.
249          */
250         if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
251         {
252                 gistvacuumpage(rel, page, buffer);
253                 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
254         }
255
256         if (is_split)
257         {
258                 /* no space for insertion */
259                 IndexTuple *itvec;
260                 int                     tlen;
261                 SplitedPageLayout *dist = NULL,
262                                    *ptr;
263                 BlockNumber oldrlink = InvalidBlockNumber;
264                 GistNSN         oldnsn = 0;
265                 SplitedPageLayout rootpg;
266                 bool            is_rootsplit;
267                 int                     npage;
268
269                 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
270
271                 /*
272                  * Form index tuples vector to split. If we're replacing an old tuple,
273                  * remove the old version from the vector.
274                  */
275                 itvec = gistextractpage(page, &tlen);
276                 if (OffsetNumberIsValid(oldoffnum))
277                 {
278                         /* on inner page we should remove old tuple */
279                         int                     pos = oldoffnum - FirstOffsetNumber;
280
281                         tlen--;
282                         if (pos != tlen)
283                                 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
284                 }
285                 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
286                 dist = gistSplit(rel, page, itvec, tlen, giststate);
287
288                 /*
289                  * Check that split didn't produce too many pages.
290                  */
291                 npage = 0;
292                 for (ptr = dist; ptr; ptr = ptr->next)
293                         npage++;
294                 /* in a root split, we'll add one more page to the list below */
295                 if (is_rootsplit)
296                         npage++;
297                 if (npage > GIST_MAX_SPLIT_PAGES)
298                         elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
299                                  npage, GIST_MAX_SPLIT_PAGES);
300
301                 /*
302                  * Set up pages to work with. Allocate new buffers for all but the
303                  * leftmost page. The original page becomes the new leftmost page, and
304                  * is just replaced with the new contents.
305                  *
306                  * For a root-split, allocate new buffers for all child pages, the
307                  * original page is overwritten with new root page containing
308                  * downlinks to the new child pages.
309                  */
310                 ptr = dist;
311                 if (!is_rootsplit)
312                 {
313                         /* save old rightlink and NSN */
314                         oldrlink = GistPageGetOpaque(page)->rightlink;
315                         oldnsn = GistPageGetNSN(page);
316
317                         dist->buffer = buffer;
318                         dist->block.blkno = BufferGetBlockNumber(buffer);
319                         dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
320
321                         /* clean all flags except F_LEAF */
322                         GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
323
324                         ptr = ptr->next;
325                 }
326                 for (; ptr; ptr = ptr->next)
327                 {
328                         /* Allocate new page */
329                         ptr->buffer = gistNewBuffer(rel);
330                         GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
331                         ptr->page = BufferGetPage(ptr->buffer);
332                         ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
333                 }
334
335                 /*
336                  * Now that we know which blocks the new pages go to, set up downlink
337                  * tuples to point to them.
338                  */
339                 for (ptr = dist; ptr; ptr = ptr->next)
340                 {
341                         ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
342                         GistTupleSetValid(ptr->itup);
343                 }
344
345                 /*
346                  * If this is a root split, we construct the new root page with the
347                  * downlinks here directly, instead of requiring the caller to insert
348                  * them. Add the new root page to the list along with the child pages.
349                  */
350                 if (is_rootsplit)
351                 {
352                         IndexTuple *downlinks;
353                         int                     ndownlinks = 0;
354                         int                     i;
355
356                         rootpg.buffer = buffer;
357                         rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
358                         GistPageGetOpaque(rootpg.page)->flags = 0;
359
360                         /* Prepare a vector of all the downlinks */
361                         for (ptr = dist; ptr; ptr = ptr->next)
362                                 ndownlinks++;
363                         downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
364                         for (i = 0, ptr = dist; ptr; ptr = ptr->next)
365                                 downlinks[i++] = ptr->itup;
366
367                         rootpg.block.blkno = GIST_ROOT_BLKNO;
368                         rootpg.block.num = ndownlinks;
369                         rootpg.list = gistfillitupvec(downlinks, ndownlinks,
370                                                                                   &(rootpg.lenlist));
371                         rootpg.itup = NULL;
372
373                         rootpg.next = dist;
374                         dist = &rootpg;
375                 }
376                 else
377                 {
378                         /* Prepare split-info to be returned to caller */
379                         for (ptr = dist; ptr; ptr = ptr->next)
380                         {
381                                 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
382
383                                 si->buf = ptr->buffer;
384                                 si->downlink = ptr->itup;
385                                 *splitinfo = lappend(*splitinfo, si);
386                         }
387                 }
388
389                 /*
390                  * Fill all pages. All the pages are new, ie. freshly allocated empty
391                  * pages, or a temporary copy of the old page.
392                  */
393                 for (ptr = dist; ptr; ptr = ptr->next)
394                 {
395                         char       *data = (char *) (ptr->list);
396
397                         for (i = 0; i < ptr->block.num; i++)
398                         {
399                                 IndexTuple      thistup = (IndexTuple) data;
400
401                                 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
402                                         elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
403
404                                 /*
405                                  * If this is the first inserted/updated tuple, let the caller
406                                  * know which page it landed on.
407                                  */
408                                 if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
409                                         *newblkno = ptr->block.blkno;
410
411                                 data += IndexTupleSize(thistup);
412                         }
413
414                         /* Set up rightlinks */
415                         if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
416                                 GistPageGetOpaque(ptr->page)->rightlink =
417                                         ptr->next->block.blkno;
418                         else
419                                 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
420
421                         /*
422                          * Mark the all but the right-most page with the follow-right
423                          * flag. It will be cleared as soon as the downlink is inserted
424                          * into the parent, but this ensures that if we error out before
425                          * that, the index is still consistent. (in buffering build mode,
426                          * any error will abort the index build anyway, so this is not
427                          * needed.)
428                          */
429                         if (ptr->next && !is_rootsplit && markfollowright)
430                                 GistMarkFollowRight(ptr->page);
431                         else
432                                 GistClearFollowRight(ptr->page);
433
434                         /*
435                          * Copy the NSN of the original page to all pages. The
436                          * F_FOLLOW_RIGHT flags ensure that scans will follow the
437                          * rightlinks until the downlinks are inserted.
438                          */
439                         GistPageSetNSN(ptr->page, oldnsn);
440                 }
441
442                 /*
443                  * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
444                  * insertion for that. NB: The number of pages and data segments
445                  * specified here must match the calculations in gistXLogSplit()!
446                  */
447                 if (RelationNeedsWAL(rel))
448                         XLogEnsureRecordSpace(npage, 1 + npage * 2);
449
450                 START_CRIT_SECTION();
451
452                 /*
453                  * Must mark buffers dirty before XLogInsert, even though we'll still
454                  * be changing their opaque fields below.
455                  */
456                 for (ptr = dist; ptr; ptr = ptr->next)
457                         MarkBufferDirty(ptr->buffer);
458                 if (BufferIsValid(leftchildbuf))
459                         MarkBufferDirty(leftchildbuf);
460
461                 /*
462                  * The first page in the chain was a temporary working copy meant to
463                  * replace the old page. Copy it over the old page.
464                  */
465                 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
466                 dist->page = BufferGetPage(dist->buffer);
467
468                 /* Write the WAL record */
469                 if (RelationNeedsWAL(rel))
470                         recptr = gistXLogSplit(is_leaf,
471                                                                    dist, oldrlink, oldnsn, leftchildbuf,
472                                                                    markfollowright);
473                 else
474                         recptr = gistGetFakeLSN(rel);
475
476                 for (ptr = dist; ptr; ptr = ptr->next)
477                 {
478                         PageSetLSN(ptr->page, recptr);
479                 }
480
481                 /*
482                  * Return the new child buffers to the caller.
483                  *
484                  * If this was a root split, we've already inserted the downlink
485                  * pointers, in the form of a new root page. Therefore we can release
486                  * all the new buffers, and keep just the root page locked.
487                  */
488                 if (is_rootsplit)
489                 {
490                         for (ptr = dist->next; ptr; ptr = ptr->next)
491                                 UnlockReleaseBuffer(ptr->buffer);
492                 }
493         }
494         else
495         {
496                 /*
497                  * Enough space.  We always get here if ntup==0.
498                  */
499                 START_CRIT_SECTION();
500
501                 /*
502                  * Delete old tuple if any, then insert new tuple(s) if any.  If
503                  * possible, use the fast path of PageIndexTupleOverwrite.
504                  */
505                 if (OffsetNumberIsValid(oldoffnum))
506                 {
507                         if (ntup == 1)
508                         {
509                                 /* One-for-one replacement, so use PageIndexTupleOverwrite */
510                                 if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
511                                                                                          IndexTupleSize(*itup)))
512                                         elog(ERROR, "failed to add item to index page in \"%s\"",
513                                                  RelationGetRelationName(rel));
514                         }
515                         else
516                         {
517                                 /* Delete old, then append new tuple(s) to page */
518                                 PageIndexTupleDelete(page, oldoffnum);
519                                 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
520                         }
521                 }
522                 else
523                 {
524                         /* Just append new tuples at the end of the page */
525                         gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
526                 }
527
528                 MarkBufferDirty(buffer);
529
530                 if (BufferIsValid(leftchildbuf))
531                         MarkBufferDirty(leftchildbuf);
532
533                 if (RelationNeedsWAL(rel))
534                 {
535                         OffsetNumber ndeloffs = 0,
536                                                 deloffs[1];
537
538                         if (OffsetNumberIsValid(oldoffnum))
539                         {
540                                 deloffs[0] = oldoffnum;
541                                 ndeloffs = 1;
542                         }
543
544                         recptr = gistXLogUpdate(buffer,
545                                                                         deloffs, ndeloffs, itup, ntup,
546                                                                         leftchildbuf);
547
548                         PageSetLSN(page, recptr);
549                 }
550                 else
551                 {
552                         recptr = gistGetFakeLSN(rel);
553                         PageSetLSN(page, recptr);
554                 }
555
556                 if (newblkno)
557                         *newblkno = blkno;
558         }
559
560         /*
561          * If we inserted the downlink for a child page, set NSN and clear
562          * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
563          * follow the rightlink if and only if they looked at the parent page
564          * before we inserted the downlink.
565          *
566          * Note that we do this *after* writing the WAL record. That means that
567          * the possible full page image in the WAL record does not include these
568          * changes, and they must be replayed even if the page is restored from
569          * the full page image. There's a chicken-and-egg problem: if we updated
570          * the child pages first, we wouldn't know the recptr of the WAL record
571          * we're about to write.
572          */
573         if (BufferIsValid(leftchildbuf))
574         {
575                 Page            leftpg = BufferGetPage(leftchildbuf);
576
577                 GistPageSetNSN(leftpg, recptr);
578                 GistClearFollowRight(leftpg);
579
580                 PageSetLSN(leftpg, recptr);
581         }
582
583         END_CRIT_SECTION();
584
585         return is_split;
586 }
587
588 /*
589  * Workhouse routine for doing insertion into a GiST index. Note that
590  * this routine assumes it is invoked in a short-lived memory context,
591  * so it does not bother releasing palloc'd allocations.
592  */
593 void
594 gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
595 {
596         ItemId          iid;
597         IndexTuple      idxtuple;
598         GISTInsertStack firststack;
599         GISTInsertStack *stack;
600         GISTInsertState state;
601         bool            xlocked = false;
602
603         memset(&state, 0, sizeof(GISTInsertState));
604         state.freespace = freespace;
605         state.r = r;
606
607         /* Start from the root */
608         firststack.blkno = GIST_ROOT_BLKNO;
609         firststack.lsn = 0;
610         firststack.parent = NULL;
611         firststack.downlinkoffnum = InvalidOffsetNumber;
612         state.stack = stack = &firststack;
613
614         /*
615          * Walk down along the path of smallest penalty, updating the parent
616          * pointers with the key we're inserting as we go. If we crash in the
617          * middle, the tree is consistent, although the possible parent updates
618          * were a waste.
619          */
620         for (;;)
621         {
622                 if (XLogRecPtrIsInvalid(stack->lsn))
623                         stack->buffer = ReadBuffer(state.r, stack->blkno);
624
625                 /*
626                  * Be optimistic and grab shared lock first. Swap it for an exclusive
627                  * lock later if we need to update the page.
628                  */
629                 if (!xlocked)
630                 {
631                         LockBuffer(stack->buffer, GIST_SHARE);
632                         gistcheckpage(state.r, stack->buffer);
633                 }
634
635                 stack->page = (Page) BufferGetPage(stack->buffer);
636                 stack->lsn = PageGetLSN(stack->page);
637                 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
638
639                 /*
640                  * If this page was split but the downlink was never inserted to the
641                  * parent because the inserting backend crashed before doing that, fix
642                  * that now.
643                  */
644                 if (GistFollowRight(stack->page))
645                 {
646                         if (!xlocked)
647                         {
648                                 LockBuffer(stack->buffer, GIST_UNLOCK);
649                                 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
650                                 xlocked = true;
651                                 /* someone might've completed the split when we unlocked */
652                                 if (!GistFollowRight(stack->page))
653                                         continue;
654                         }
655                         gistfixsplit(&state, giststate);
656
657                         UnlockReleaseBuffer(stack->buffer);
658                         xlocked = false;
659                         state.stack = stack = stack->parent;
660                         continue;
661                 }
662
663                 if (stack->blkno != GIST_ROOT_BLKNO &&
664                         stack->parent->lsn < GistPageGetNSN(stack->page))
665                 {
666                         /*
667                          * Concurrent split detected. There's no guarantee that the
668                          * downlink for this page is consistent with the tuple we're
669                          * inserting anymore, so go back to parent and rechoose the best
670                          * child.
671                          */
672                         UnlockReleaseBuffer(stack->buffer);
673                         xlocked = false;
674                         state.stack = stack = stack->parent;
675                         continue;
676                 }
677
678                 if (!GistPageIsLeaf(stack->page))
679                 {
680                         /*
681                          * This is an internal page so continue to walk down the tree.
682                          * Find the child node that has the minimum insertion penalty.
683                          */
684                         BlockNumber childblkno;
685                         IndexTuple      newtup;
686                         GISTInsertStack *item;
687                         OffsetNumber downlinkoffnum;
688
689                         downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
690                         iid = PageGetItemId(stack->page, downlinkoffnum);
691                         idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
692                         childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
693
694                         /*
695                          * Check that it's not a leftover invalid tuple from pre-9.1
696                          */
697                         if (GistTupleIsInvalid(idxtuple))
698                                 ereport(ERROR,
699                                                 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
700                                                                 RelationGetRelationName(r)),
701                                                  errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
702                                                  errhint("Please REINDEX it.")));
703
704                         /*
705                          * Check that the key representing the target child node is
706                          * consistent with the key we're inserting. Update it if it's not.
707                          */
708                         newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
709                         if (newtup)
710                         {
711                                 /*
712                                  * Swap shared lock for an exclusive one. Beware, the page may
713                                  * change while we unlock/lock the page...
714                                  */
715                                 if (!xlocked)
716                                 {
717                                         LockBuffer(stack->buffer, GIST_UNLOCK);
718                                         LockBuffer(stack->buffer, GIST_EXCLUSIVE);
719                                         xlocked = true;
720                                         stack->page = (Page) BufferGetPage(stack->buffer);
721
722                                         if (PageGetLSN(stack->page) != stack->lsn)
723                                         {
724                                                 /* the page was changed while we unlocked it, retry */
725                                                 continue;
726                                         }
727                                 }
728
729                                 /*
730                                  * Update the tuple.
731                                  *
732                                  * We still hold the lock after gistinserttuple(), but it
733                                  * might have to split the page to make the updated tuple fit.
734                                  * In that case the updated tuple might migrate to the other
735                                  * half of the split, so we have to go back to the parent and
736                                  * descend back to the half that's a better fit for the new
737                                  * tuple.
738                                  */
739                                 if (gistinserttuple(&state, stack, giststate, newtup,
740                                                                         downlinkoffnum))
741                                 {
742                                         /*
743                                          * If this was a root split, the root page continues to be
744                                          * the parent and the updated tuple went to one of the
745                                          * child pages, so we just need to retry from the root
746                                          * page.
747                                          */
748                                         if (stack->blkno != GIST_ROOT_BLKNO)
749                                         {
750                                                 UnlockReleaseBuffer(stack->buffer);
751                                                 xlocked = false;
752                                                 state.stack = stack = stack->parent;
753                                         }
754                                         continue;
755                                 }
756                         }
757                         LockBuffer(stack->buffer, GIST_UNLOCK);
758                         xlocked = false;
759
760                         /* descend to the chosen child */
761                         item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
762                         item->blkno = childblkno;
763                         item->parent = stack;
764                         item->downlinkoffnum = downlinkoffnum;
765                         state.stack = stack = item;
766                 }
767                 else
768                 {
769                         /*
770                          * Leaf page. Insert the new key. We've already updated all the
771                          * parents on the way down, but we might have to split the page if
772                          * it doesn't fit. gistinserthere() will take care of that.
773                          */
774
775                         /*
776                          * Swap shared lock for an exclusive one. Be careful, the page may
777                          * change while we unlock/lock the page...
778                          */
779                         if (!xlocked)
780                         {
781                                 LockBuffer(stack->buffer, GIST_UNLOCK);
782                                 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
783                                 xlocked = true;
784                                 stack->page = (Page) BufferGetPage(stack->buffer);
785                                 stack->lsn = PageGetLSN(stack->page);
786
787                                 if (stack->blkno == GIST_ROOT_BLKNO)
788                                 {
789                                         /*
790                                          * the only page that can become inner instead of leaf is
791                                          * the root page, so for root we should recheck it
792                                          */
793                                         if (!GistPageIsLeaf(stack->page))
794                                         {
795                                                 /*
796                                                  * very rare situation: during unlock/lock index with
797                                                  * number of pages = 1 was increased
798                                                  */
799                                                 LockBuffer(stack->buffer, GIST_UNLOCK);
800                                                 xlocked = false;
801                                                 continue;
802                                         }
803
804                                         /*
805                                          * we don't need to check root split, because checking
806                                          * leaf/inner is enough to recognize split for root
807                                          */
808                                 }
809                                 else if (GistFollowRight(stack->page) ||
810                                                  stack->parent->lsn < GistPageGetNSN(stack->page))
811                                 {
812                                         /*
813                                          * The page was split while we momentarily unlocked the
814                                          * page. Go back to parent.
815                                          */
816                                         UnlockReleaseBuffer(stack->buffer);
817                                         xlocked = false;
818                                         state.stack = stack = stack->parent;
819                                         continue;
820                                 }
821                         }
822
823                         /* now state.stack->(page, buffer and blkno) points to leaf page */
824
825                         gistinserttuple(&state, stack, giststate, itup,
826                                                         InvalidOffsetNumber);
827                         LockBuffer(stack->buffer, GIST_UNLOCK);
828
829                         /* Release any pins we might still hold before exiting */
830                         for (; stack; stack = stack->parent)
831                                 ReleaseBuffer(stack->buffer);
832                         break;
833                 }
834         }
835 }
836
837 /*
838  * Traverse the tree to find path from root page to specified "child" block.
839  *
840  * returns a new insertion stack, starting from the parent of "child", up
841  * to the root. *downlinkoffnum is set to the offset of the downlink in the
842  * direct parent of child.
843  *
844  * To prevent deadlocks, this should lock only one page at a time.
845  */
846 static GISTInsertStack *
847 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
848 {
849         Page            page;
850         Buffer          buffer;
851         OffsetNumber i,
852                                 maxoff;
853         ItemId          iid;
854         IndexTuple      idxtuple;
855         List       *fifo;
856         GISTInsertStack *top,
857                            *ptr;
858         BlockNumber blkno;
859
860         top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
861         top->blkno = GIST_ROOT_BLKNO;
862         top->downlinkoffnum = InvalidOffsetNumber;
863
864         fifo = list_make1(top);
865         while (fifo != NIL)
866         {
867                 /* Get next page to visit */
868                 top = linitial(fifo);
869                 fifo = list_delete_first(fifo);
870
871                 buffer = ReadBuffer(r, top->blkno);
872                 LockBuffer(buffer, GIST_SHARE);
873                 gistcheckpage(r, buffer);
874                 page = (Page) BufferGetPage(buffer);
875
876                 if (GistPageIsLeaf(page))
877                 {
878                         /*
879                          * Because we scan the index top-down, all the rest of the pages
880                          * in the queue must be leaf pages as well.
881                          */
882                         UnlockReleaseBuffer(buffer);
883                         break;
884                 }
885
886                 top->lsn = PageGetLSN(page);
887
888                 /*
889                  * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
890                  * downlink. This should not normally happen..
891                  */
892                 if (GistFollowRight(page))
893                         elog(ERROR, "concurrent GiST page split was incomplete");
894
895                 if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
896                         GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
897                 {
898                         /*
899                          * Page was split while we looked elsewhere. We didn't see the
900                          * downlink to the right page when we scanned the parent, so add
901                          * it to the queue now.
902                          *
903                          * Put the right page ahead of the queue, so that we visit it
904                          * next. That's important, because if this is the lowest internal
905                          * level, just above leaves, we might already have queued up some
906                          * leaf pages, and we assume that there can't be any non-leaf
907                          * pages behind leaf pages.
908                          */
909                         ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
910                         ptr->blkno = GistPageGetOpaque(page)->rightlink;
911                         ptr->downlinkoffnum = InvalidOffsetNumber;
912                         ptr->parent = top->parent;
913
914                         fifo = lcons(ptr, fifo);
915                 }
916
917                 maxoff = PageGetMaxOffsetNumber(page);
918
919                 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
920                 {
921                         iid = PageGetItemId(page, i);
922                         idxtuple = (IndexTuple) PageGetItem(page, iid);
923                         blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
924                         if (blkno == child)
925                         {
926                                 /* Found it! */
927                                 UnlockReleaseBuffer(buffer);
928                                 *downlinkoffnum = i;
929                                 return top;
930                         }
931                         else
932                         {
933                                 /* Append this child to the list of pages to visit later */
934                                 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
935                                 ptr->blkno = blkno;
936                                 ptr->downlinkoffnum = i;
937                                 ptr->parent = top;
938
939                                 fifo = lappend(fifo, ptr);
940                         }
941                 }
942
943                 UnlockReleaseBuffer(buffer);
944         }
945
946         elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
947                  RelationGetRelationName(r), child);
948         return NULL;                            /* keep compiler quiet */
949 }
950
951 /*
952  * Updates the stack so that child->parent is the correct parent of the
953  * child. child->parent must be exclusively locked on entry, and will
954  * remain so at exit, but it might not be the same page anymore.
955  */
956 static void
957 gistFindCorrectParent(Relation r, GISTInsertStack *child)
958 {
959         GISTInsertStack *parent = child->parent;
960
961         gistcheckpage(r, parent->buffer);
962         parent->page = (Page) BufferGetPage(parent->buffer);
963
964         /* here we don't need to distinguish between split and page update */
965         if (child->downlinkoffnum == InvalidOffsetNumber ||
966                 parent->lsn != PageGetLSN(parent->page))
967         {
968                 /* parent is changed, look child in right links until found */
969                 OffsetNumber i,
970                                         maxoff;
971                 ItemId          iid;
972                 IndexTuple      idxtuple;
973                 GISTInsertStack *ptr;
974
975                 while (true)
976                 {
977                         maxoff = PageGetMaxOffsetNumber(parent->page);
978                         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
979                         {
980                                 iid = PageGetItemId(parent->page, i);
981                                 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
982                                 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
983                                 {
984                                         /* yes!!, found */
985                                         child->downlinkoffnum = i;
986                                         return;
987                                 }
988                         }
989
990                         parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
991                         UnlockReleaseBuffer(parent->buffer);
992                         if (parent->blkno == InvalidBlockNumber)
993                         {
994                                 /*
995                                  * End of chain and still didn't find parent. It's a very-very
996                                  * rare situation when root splited.
997                                  */
998                                 break;
999                         }
1000                         parent->buffer = ReadBuffer(r, parent->blkno);
1001                         LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1002                         gistcheckpage(r, parent->buffer);
1003                         parent->page = (Page) BufferGetPage(parent->buffer);
1004                 }
1005
1006                 /*
1007                  * awful!!, we need search tree to find parent ... , but before we
1008                  * should release all old parent
1009                  */
1010
1011                 ptr = child->parent->parent;    /* child->parent already released
1012                                                                                  * above */
1013                 while (ptr)
1014                 {
1015                         ReleaseBuffer(ptr->buffer);
1016                         ptr = ptr->parent;
1017                 }
1018
1019                 /* ok, find new path */
1020                 ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1021
1022                 /* read all buffers as expected by caller */
1023                 /* note we don't lock them or gistcheckpage them here! */
1024                 while (ptr)
1025                 {
1026                         ptr->buffer = ReadBuffer(r, ptr->blkno);
1027                         ptr->page = (Page) BufferGetPage(ptr->buffer);
1028                         ptr = ptr->parent;
1029                 }
1030
1031                 /* install new chain of parents to stack */
1032                 child->parent = parent;
1033
1034                 /* make recursive call to normal processing */
1035                 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1036                 gistFindCorrectParent(r, child);
1037         }
1038
1039         return;
1040 }
1041
1042 /*
1043  * Form a downlink pointer for the page in 'buf'.
1044  */
1045 static IndexTuple
1046 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1047                                  GISTInsertStack *stack)
1048 {
1049         Page            page = BufferGetPage(buf);
1050         OffsetNumber maxoff;
1051         OffsetNumber offset;
1052         IndexTuple      downlink = NULL;
1053
1054         maxoff = PageGetMaxOffsetNumber(page);
1055         for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1056         {
1057                 IndexTuple      ituple = (IndexTuple)
1058                 PageGetItem(page, PageGetItemId(page, offset));
1059
1060                 if (downlink == NULL)
1061                         downlink = CopyIndexTuple(ituple);
1062                 else
1063                 {
1064                         IndexTuple      newdownlink;
1065
1066                         newdownlink = gistgetadjusted(rel, downlink, ituple,
1067                                                                                   giststate);
1068                         if (newdownlink)
1069                                 downlink = newdownlink;
1070                 }
1071         }
1072
1073         /*
1074          * If the page is completely empty, we can't form a meaningful downlink
1075          * for it. But we have to insert a downlink for the page. Any key will do,
1076          * as long as its consistent with the downlink of parent page, so that we
1077          * can legally insert it to the parent. A minimal one that matches as few
1078          * scans as possible would be best, to keep scans from doing useless work,
1079          * but we don't know how to construct that. So we just use the downlink of
1080          * the original page that was split - that's as far from optimal as it can
1081          * get but will do..
1082          */
1083         if (!downlink)
1084         {
1085                 ItemId          iid;
1086
1087                 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1088                 gistFindCorrectParent(rel, stack);
1089                 iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1090                 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1091                 downlink = CopyIndexTuple(downlink);
1092                 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1093         }
1094
1095         ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1096         GistTupleSetValid(downlink);
1097
1098         return downlink;
1099 }
1100
1101
1102 /*
1103  * Complete the incomplete split of state->stack->page.
1104  */
1105 static void
1106 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1107 {
1108         GISTInsertStack *stack = state->stack;
1109         Buffer          buf;
1110         Page            page;
1111         List       *splitinfo = NIL;
1112
1113         elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1114                  RelationGetRelationName(state->r), stack->blkno);
1115
1116         Assert(GistFollowRight(stack->page));
1117         Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1118
1119         buf = stack->buffer;
1120
1121         /*
1122          * Read the chain of split pages, following the rightlinks. Construct a
1123          * downlink tuple for each page.
1124          */
1125         for (;;)
1126         {
1127                 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1128                 IndexTuple      downlink;
1129
1130                 page = BufferGetPage(buf);
1131
1132                 /* Form the new downlink tuples to insert to parent */
1133                 downlink = gistformdownlink(state->r, buf, giststate, stack);
1134
1135                 si->buf = buf;
1136                 si->downlink = downlink;
1137
1138                 splitinfo = lappend(splitinfo, si);
1139
1140                 if (GistFollowRight(page))
1141                 {
1142                         /* lock next page */
1143                         buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1144                         LockBuffer(buf, GIST_EXCLUSIVE);
1145                 }
1146                 else
1147                         break;
1148         }
1149
1150         /* Insert the downlinks */
1151         gistfinishsplit(state, stack, giststate, splitinfo, false);
1152 }
1153
1154 /*
1155  * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1156  * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1157  * 'stack' represents the path from the root to the page being updated.
1158  *
1159  * The caller must hold an exclusive lock on stack->buffer.  The lock is still
1160  * held on return, but the page might not contain the inserted tuple if the
1161  * page was split. The function returns true if the page was split, false
1162  * otherwise.
1163  */
1164 static bool
1165 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1166                           GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1167 {
1168         return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1169                                                         InvalidBuffer, InvalidBuffer, false, false);
1170 }
1171
1172 /* ----------------
1173  * An extended workhorse version of gistinserttuple(). This version allows
1174  * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1175  * This is used to recursively update the downlinks in the parent when a page
1176  * is split.
1177  *
1178  * If leftchild and rightchild are valid, we're inserting/replacing the
1179  * downlink for rightchild, and leftchild is its left sibling. We clear the
1180  * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1181  * insertion of the downlink.
1182  *
1183  * To avoid holding locks for longer than necessary, when recursing up the
1184  * tree to update the parents, the locking is a bit peculiar here. On entry,
1185  * the caller must hold an exclusive lock on stack->buffer, as well as
1186  * leftchild and rightchild if given. On return:
1187  *
1188  *      - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1189  *        always kept pinned, however.
1190  *      - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1191  *        is kept pinned.
1192  *      - Lock and pin on 'rightchild' are always released.
1193  *
1194  * Returns 'true' if the page had to be split. Note that if the page was
1195  * split, the inserted/updated tuples might've been inserted to a right
1196  * sibling of stack->buffer instead of stack->buffer itself.
1197  */
1198 static bool
1199 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1200                                  GISTSTATE *giststate,
1201                                  IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1202                                  Buffer leftchild, Buffer rightchild,
1203                                  bool unlockbuf, bool unlockleftchild)
1204 {
1205         List       *splitinfo;
1206         bool            is_split;
1207
1208         /* Insert the tuple(s) to the page, splitting the page if necessary */
1209         is_split = gistplacetopage(state->r, state->freespace, giststate,
1210                                                            stack->buffer,
1211                                                            tuples, ntup,
1212                                                            oldoffnum, NULL,
1213                                                            leftchild,
1214                                                            &splitinfo,
1215                                                            true);
1216
1217         /*
1218          * Before recursing up in case the page was split, release locks on the
1219          * child pages. We don't need to keep them locked when updating the
1220          * parent.
1221          */
1222         if (BufferIsValid(rightchild))
1223                 UnlockReleaseBuffer(rightchild);
1224         if (BufferIsValid(leftchild) && unlockleftchild)
1225                 LockBuffer(leftchild, GIST_UNLOCK);
1226
1227         /*
1228          * If we had to split, insert/update the downlinks in the parent. If the
1229          * caller requested us to release the lock on stack->buffer, tell
1230          * gistfinishsplit() to do that as soon as it's safe to do so. If we
1231          * didn't have to split, release it ourselves.
1232          */
1233         if (splitinfo)
1234                 gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1235         else if (unlockbuf)
1236                 LockBuffer(stack->buffer, GIST_UNLOCK);
1237
1238         return is_split;
1239 }
1240
1241 /*
1242  * Finish an incomplete split by inserting/updating the downlinks in parent
1243  * page. 'splitinfo' contains all the child pages involved in the split,
1244  * from left-to-right.
1245  *
1246  * On entry, the caller must hold a lock on stack->buffer and all the child
1247  * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1248  * released on return. The child pages are always unlocked and unpinned.
1249  */
1250 static void
1251 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1252                                 GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1253 {
1254         ListCell   *lc;
1255         List       *reversed;
1256         GISTPageSplitInfo *right;
1257         GISTPageSplitInfo *left;
1258         IndexTuple      tuples[2];
1259
1260         /* A split always contains at least two halves */
1261         Assert(list_length(splitinfo) >= 2);
1262
1263         /*
1264          * We need to insert downlinks for each new page, and update the downlink
1265          * for the original (leftmost) page in the split. Begin at the rightmost
1266          * page, inserting one downlink at a time until there's only two pages
1267          * left. Finally insert the downlink for the last new page and update the
1268          * downlink for the original page as one operation.
1269          */
1270
1271         /* for convenience, create a copy of the list in reverse order */
1272         reversed = NIL;
1273         foreach(lc, splitinfo)
1274         {
1275                 reversed = lcons(lfirst(lc), reversed);
1276         }
1277
1278         LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1279         gistFindCorrectParent(state->r, stack);
1280
1281         /*
1282          * insert downlinks for the siblings from right to left, until there are
1283          * only two siblings left.
1284          */
1285         while (list_length(reversed) > 2)
1286         {
1287                 right = (GISTPageSplitInfo *) linitial(reversed);
1288                 left = (GISTPageSplitInfo *) lsecond(reversed);
1289
1290                 if (gistinserttuples(state, stack->parent, giststate,
1291                                                          &right->downlink, 1,
1292                                                          InvalidOffsetNumber,
1293                                                          left->buf, right->buf, false, false))
1294                 {
1295                         /*
1296                          * If the parent page was split, need to relocate the original
1297                          * parent pointer.
1298                          */
1299                         gistFindCorrectParent(state->r, stack);
1300                 }
1301                 /* gistinserttuples() released the lock on right->buf. */
1302                 reversed = list_delete_first(reversed);
1303         }
1304
1305         right = (GISTPageSplitInfo *) linitial(reversed);
1306         left = (GISTPageSplitInfo *) lsecond(reversed);
1307
1308         /*
1309          * Finally insert downlink for the remaining right page and update the
1310          * downlink for the original page to not contain the tuples that were
1311          * moved to the new pages.
1312          */
1313         tuples[0] = left->downlink;
1314         tuples[1] = right->downlink;
1315         gistinserttuples(state, stack->parent, giststate,
1316                                          tuples, 2,
1317                                          stack->downlinkoffnum,
1318                                          left->buf, right->buf,
1319                                          true,          /* Unlock parent */
1320                                          unlockbuf      /* Unlock stack->buffer if caller wants that */
1321                 );
1322         Assert(left->buf == stack->buffer);
1323 }
1324
1325 /*
1326  * gistSplit -- split a page in the tree and fill struct
1327  * used for XLOG and real writes buffers. Function is recursive, ie
1328  * it will split page until keys will fit in every page.
1329  */
1330 SplitedPageLayout *
1331 gistSplit(Relation r,
1332                   Page page,
1333                   IndexTuple *itup,             /* contains compressed entry */
1334                   int len,
1335                   GISTSTATE *giststate)
1336 {
1337         IndexTuple *lvectup,
1338                            *rvectup;
1339         GistSplitVector v;
1340         int                     i;
1341         SplitedPageLayout *res = NULL;
1342
1343         /* this should never recurse very deeply, but better safe than sorry */
1344         check_stack_depth();
1345
1346         /* there's no point in splitting an empty page */
1347         Assert(len > 0);
1348
1349         /*
1350          * If a single tuple doesn't fit on a page, no amount of splitting will
1351          * help.
1352          */
1353         if (len == 1)
1354                 ereport(ERROR,
1355                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1356                         errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1357                                    IndexTupleSize(itup[0]), GiSTPageSize,
1358                                    RelationGetRelationName(r))));
1359
1360         memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1361         memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1362         gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1363
1364         /* form left and right vector */
1365         lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1366         rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1367
1368         for (i = 0; i < v.splitVector.spl_nleft; i++)
1369                 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1370
1371         for (i = 0; i < v.splitVector.spl_nright; i++)
1372                 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1373
1374         /* finalize splitting (may need another split) */
1375         if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1376         {
1377                 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1378         }
1379         else
1380         {
1381                 ROTATEDIST(res);
1382                 res->block.num = v.splitVector.spl_nright;
1383                 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1384                 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1385         }
1386
1387         if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1388         {
1389                 SplitedPageLayout *resptr,
1390                                    *subres;
1391
1392                 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1393
1394                 /* install on list's tail */
1395                 while (resptr->next)
1396                         resptr = resptr->next;
1397
1398                 resptr->next = res;
1399                 res = subres;
1400         }
1401         else
1402         {
1403                 ROTATEDIST(res);
1404                 res->block.num = v.splitVector.spl_nleft;
1405                 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1406                 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1407         }
1408
1409         return res;
1410 }
1411
1412 /*
1413  * Create a GISTSTATE and fill it with information about the index
1414  */
1415 GISTSTATE *
1416 initGISTstate(Relation index)
1417 {
1418         GISTSTATE  *giststate;
1419         MemoryContext scanCxt;
1420         MemoryContext oldCxt;
1421         int                     i;
1422
1423         /* safety check to protect fixed-size arrays in GISTSTATE */
1424         if (index->rd_att->natts > INDEX_MAX_KEYS)
1425                 elog(ERROR, "numberOfAttributes %d > %d",
1426                          index->rd_att->natts, INDEX_MAX_KEYS);
1427
1428         /* Create the memory context that will hold the GISTSTATE */
1429         scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1430                                                                         "GiST scan context",
1431                                                                         ALLOCSET_DEFAULT_SIZES);
1432         oldCxt = MemoryContextSwitchTo(scanCxt);
1433
1434         /* Create and fill in the GISTSTATE */
1435         giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1436
1437         giststate->scanCxt = scanCxt;
1438         giststate->tempCxt = scanCxt;           /* caller must change this if needed */
1439         giststate->tupdesc = index->rd_att;
1440
1441         for (i = 0; i < index->rd_att->natts; i++)
1442         {
1443                 fmgr_info_copy(&(giststate->consistentFn[i]),
1444                                            index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1445                                            scanCxt);
1446                 fmgr_info_copy(&(giststate->unionFn[i]),
1447                                            index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1448                                            scanCxt);
1449                 fmgr_info_copy(&(giststate->compressFn[i]),
1450                                            index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1451                                            scanCxt);
1452                 fmgr_info_copy(&(giststate->decompressFn[i]),
1453                                            index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1454                                            scanCxt);
1455                 fmgr_info_copy(&(giststate->penaltyFn[i]),
1456                                            index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1457                                            scanCxt);
1458                 fmgr_info_copy(&(giststate->picksplitFn[i]),
1459                                            index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1460                                            scanCxt);
1461                 fmgr_info_copy(&(giststate->equalFn[i]),
1462                                            index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1463                                            scanCxt);
1464                 /* opclasses are not required to provide a Distance method */
1465                 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1466                         fmgr_info_copy(&(giststate->distanceFn[i]),
1467                                                  index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1468                                                    scanCxt);
1469                 else
1470                         giststate->distanceFn[i].fn_oid = InvalidOid;
1471
1472                 /* opclasses are not required to provide a Fetch method */
1473                 if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1474                         fmgr_info_copy(&(giststate->fetchFn[i]),
1475                                                    index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1476                                                    scanCxt);
1477                 else
1478                         giststate->fetchFn[i].fn_oid = InvalidOid;
1479
1480                 /*
1481                  * If the index column has a specified collation, we should honor that
1482                  * while doing comparisons.  However, we may have a collatable storage
1483                  * type for a noncollatable indexed data type.  If there's no index
1484                  * collation then specify default collation in case the support
1485                  * functions need collation.  This is harmless if the support
1486                  * functions don't care about collation, so we just do it
1487                  * unconditionally.  (We could alternatively call get_typcollation,
1488                  * but that seems like expensive overkill --- there aren't going to be
1489                  * any cases where a GiST storage type has a nondefault collation.)
1490                  */
1491                 if (OidIsValid(index->rd_indcollation[i]))
1492                         giststate->supportCollation[i] = index->rd_indcollation[i];
1493                 else
1494                         giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1495         }
1496
1497         MemoryContextSwitchTo(oldCxt);
1498
1499         return giststate;
1500 }
1501
1502 void
1503 freeGISTstate(GISTSTATE *giststate)
1504 {
1505         /* It's sufficient to delete the scanCxt */
1506         MemoryContextDelete(giststate->scanCxt);
1507 }
1508
1509 /*
1510  * gistvacuumpage() -- try to remove LP_DEAD items from the given page.
1511  * Function assumes that buffer is exclusively locked.
1512  */
1513 static void
1514 gistvacuumpage(Relation rel, Page page, Buffer buffer)
1515 {
1516         OffsetNumber deletable[MaxIndexTuplesPerPage];
1517         int                     ndeletable = 0;
1518         OffsetNumber offnum,
1519                                 maxoff;
1520
1521         Assert(GistPageIsLeaf(page));
1522
1523         /*
1524          * Scan over all items to see which ones need to be deleted according to
1525          * LP_DEAD flags.
1526          */
1527         maxoff = PageGetMaxOffsetNumber(page);
1528         for (offnum = FirstOffsetNumber;
1529                  offnum <= maxoff;
1530                  offnum = OffsetNumberNext(offnum))
1531         {
1532                 ItemId          itemId = PageGetItemId(page, offnum);
1533
1534                 if (ItemIdIsDead(itemId))
1535                         deletable[ndeletable++] = offnum;
1536         }
1537
1538         if (ndeletable > 0)
1539         {
1540                 START_CRIT_SECTION();
1541
1542                 PageIndexMultiDelete(page, deletable, ndeletable);
1543
1544                 /*
1545                  * Mark the page as not containing any LP_DEAD items.  This is not
1546                  * certainly true (there might be some that have recently been marked,
1547                  * but weren't included in our target-item list), but it will almost
1548                  * always be true and it doesn't seem worth an additional page scan to
1549                  * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1550                  */
1551                 GistClearPageHasGarbage(page);
1552
1553                 MarkBufferDirty(buffer);
1554
1555                 /* XLOG stuff */
1556                 if (RelationNeedsWAL(rel))
1557                 {
1558                         XLogRecPtr      recptr;
1559
1560                         recptr = gistXLogUpdate(buffer,
1561                                                                         deletable, ndeletable,
1562                                                                         NULL, 0, InvalidBuffer);
1563
1564                         PageSetLSN(page, recptr);
1565                 }
1566                 else
1567                         PageSetLSN(page, gistGetFakeLSN(rel));
1568
1569                 END_CRIT_SECTION();
1570         }
1571
1572         /*
1573          * Note: if we didn't find any LP_DEAD items, then the page's
1574          * F_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
1575          * separate write to clear it, however.  We will clear it when we split
1576          * the page.
1577          */
1578 }