]> granicus.if.org Git - postgresql/blob - src/backend/access/gist/gist.c
8c2dbc940f45a890a1a0a9e432d278b001200827
[postgresql] / src / backend / access / gist / gist.c
1 /*-------------------------------------------------------------------------
2  *
3  * gist.c
4  *        interface routines for the postgres GiST index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        src/backend/access/gist/gist.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/gist_private.h"
19 #include "catalog/index.h"
20 #include "miscadmin.h"
21 #include "storage/bufmgr.h"
22 #include "storage/indexfsm.h"
23 #include "utils/memutils.h"
24
25 /* Working state for gistbuild and its callback */
26 typedef struct
27 {
28         GISTSTATE       giststate;
29         int                     numindexattrs;
30         double          indtuples;
31         MemoryContext tmpCtx;
32 } GISTBuildState;
33
34
35 /* non-export function prototypes */
36 static void gistbuildCallback(Relation index,
37                                   HeapTuple htup,
38                                   Datum *values,
39                                   bool *isnull,
40                                   bool tupleIsAlive,
41                                   void *state);
42 static void gistdoinsert(Relation r,
43                          IndexTuple itup,
44                          Size freespace,
45                          GISTSTATE *GISTstate);
46 static void gistfindleaf(GISTInsertState *state,
47                          GISTSTATE *giststate);
48
49
50 #define ROTATEDIST(d) do { \
51         SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
52         memset(tmp,0,sizeof(SplitedPageLayout)); \
53         tmp->block.blkno = InvalidBlockNumber;  \
54         tmp->buffer = InvalidBuffer;    \
55         tmp->next = (d); \
56         (d)=tmp; \
57 } while(0)
58
59
60 /*
61  * Create and return a temporary memory context for use by GiST. We
62  * _always_ invoke user-provided methods in a temporary memory
63  * context, so that memory leaks in those functions cannot cause
64  * problems. Also, we use some additional temporary contexts in the
65  * GiST code itself, to avoid the need to do some awkward manual
66  * memory management.
67  */
68 MemoryContext
69 createTempGistContext(void)
70 {
71         return AllocSetContextCreate(CurrentMemoryContext,
72                                                                  "GiST temporary context",
73                                                                  ALLOCSET_DEFAULT_MINSIZE,
74                                                                  ALLOCSET_DEFAULT_INITSIZE,
75                                                                  ALLOCSET_DEFAULT_MAXSIZE);
76 }
77
78 /*
79  * Routine to build an index.  Basically calls insert over and over.
80  *
81  * XXX: it would be nice to implement some sort of bulk-loading
82  * algorithm, but it is not clear how to do that.
83  */
84 Datum
85 gistbuild(PG_FUNCTION_ARGS)
86 {
87         Relation        heap = (Relation) PG_GETARG_POINTER(0);
88         Relation        index = (Relation) PG_GETARG_POINTER(1);
89         IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
90         IndexBuildResult *result;
91         double          reltuples;
92         GISTBuildState buildstate;
93         Buffer          buffer;
94         Page            page;
95
96         /*
97          * We expect to be called exactly once for any index relation. If that's
98          * not the case, big trouble's what we have.
99          */
100         if (RelationGetNumberOfBlocks(index) != 0)
101                 elog(ERROR, "index \"%s\" already contains data",
102                          RelationGetRelationName(index));
103
104         /* no locking is needed */
105         initGISTstate(&buildstate.giststate, index);
106
107         /* initialize the root page */
108         buffer = gistNewBuffer(index);
109         Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
110         page = BufferGetPage(buffer);
111
112         START_CRIT_SECTION();
113
114         GISTInitBuffer(buffer, F_LEAF);
115
116         MarkBufferDirty(buffer);
117
118         if (!index->rd_istemp)
119         {
120                 XLogRecPtr      recptr;
121                 XLogRecData rdata;
122
123                 rdata.data = (char *) &(index->rd_node);
124                 rdata.len = sizeof(RelFileNode);
125                 rdata.buffer = InvalidBuffer;
126                 rdata.next = NULL;
127
128                 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
129                 PageSetLSN(page, recptr);
130                 PageSetTLI(page, ThisTimeLineID);
131         }
132         else
133                 PageSetLSN(page, GetXLogRecPtrForTemp());
134
135         UnlockReleaseBuffer(buffer);
136
137         END_CRIT_SECTION();
138
139         /* build the index */
140         buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
141         buildstate.indtuples = 0;
142
143         /*
144          * create a temporary memory context that is reset once for each tuple
145          * inserted into the index
146          */
147         buildstate.tmpCtx = createTempGistContext();
148
149         /* do the heap scan */
150         reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
151                                                                    gistbuildCallback, (void *) &buildstate);
152
153         /* okay, all heap tuples are indexed */
154         MemoryContextDelete(buildstate.tmpCtx);
155
156         freeGISTstate(&buildstate.giststate);
157
158         /*
159          * Return statistics
160          */
161         result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
162
163         result->heap_tuples = reltuples;
164         result->index_tuples = buildstate.indtuples;
165
166         PG_RETURN_POINTER(result);
167 }
168
169 /*
170  * Per-tuple callback from IndexBuildHeapScan
171  */
172 static void
173 gistbuildCallback(Relation index,
174                                   HeapTuple htup,
175                                   Datum *values,
176                                   bool *isnull,
177                                   bool tupleIsAlive,
178                                   void *state)
179 {
180         GISTBuildState *buildstate = (GISTBuildState *) state;
181         IndexTuple      itup;
182         MemoryContext oldCtx;
183
184         oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
185
186         /* form an index tuple and point it at the heap tuple */
187         itup = gistFormTuple(&buildstate->giststate, index,
188                                                  values, isnull, true /* size is currently bogus */ );
189         itup->t_tid = htup->t_self;
190
191         /*
192          * Since we already have the index relation locked, we call gistdoinsert
193          * directly.  Normal access method calls dispatch through gistinsert,
194          * which locks the relation for write.  This is the right thing to do if
195          * you're inserting single tups, but not when you're initializing the
196          * whole index at once.
197          *
198          * In this path we respect the fillfactor setting, whereas insertions
199          * after initial build do not.
200          */
201         gistdoinsert(index, itup,
202                           RelationGetTargetPageFreeSpace(index, GIST_DEFAULT_FILLFACTOR),
203                                  &buildstate->giststate);
204
205         buildstate->indtuples += 1;
206         MemoryContextSwitchTo(oldCtx);
207         MemoryContextReset(buildstate->tmpCtx);
208 }
209
210 /*
211  *      gistinsert -- wrapper for GiST tuple insertion.
212  *
213  *        This is the public interface routine for tuple insertion in GiSTs.
214  *        It doesn't do any work; just locks the relation and passes the buck.
215  */
216 Datum
217 gistinsert(PG_FUNCTION_ARGS)
218 {
219         Relation        r = (Relation) PG_GETARG_POINTER(0);
220         Datum      *values = (Datum *) PG_GETARG_POINTER(1);
221         bool       *isnull = (bool *) PG_GETARG_POINTER(2);
222         ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
223
224 #ifdef NOT_USED
225         Relation        heapRel = (Relation) PG_GETARG_POINTER(4);
226         IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
227 #endif
228         IndexTuple      itup;
229         GISTSTATE       giststate;
230         MemoryContext oldCtx;
231         MemoryContext insertCtx;
232
233         insertCtx = createTempGistContext();
234         oldCtx = MemoryContextSwitchTo(insertCtx);
235
236         initGISTstate(&giststate, r);
237
238         itup = gistFormTuple(&giststate, r,
239                                                  values, isnull, true /* size is currently bogus */ );
240         itup->t_tid = *ht_ctid;
241
242         gistdoinsert(r, itup, 0, &giststate);
243
244         /* cleanup */
245         freeGISTstate(&giststate);
246         MemoryContextSwitchTo(oldCtx);
247         MemoryContextDelete(insertCtx);
248
249         PG_RETURN_BOOL(false);
250 }
251
252
253 /*
254  * Workhouse routine for doing insertion into a GiST index. Note that
255  * this routine assumes it is invoked in a short-lived memory context,
256  * so it does not bother releasing palloc'd allocations.
257  */
258 static void
259 gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
260 {
261         GISTInsertState state;
262
263         memset(&state, 0, sizeof(GISTInsertState));
264
265         state.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
266         state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
267         memcpy(state.itup[0], itup, IndexTupleSize(itup));
268         state.ituplen = 1;
269         state.freespace = freespace;
270         state.r = r;
271         state.key = itup->t_tid;
272         state.needInsertComplete = true;
273
274         state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
275         state.stack->blkno = GIST_ROOT_BLKNO;
276
277         gistfindleaf(&state, giststate);
278         gistmakedeal(&state, giststate);
279 }
280
281 static bool
282 gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
283 {
284         bool            is_splitted = false;
285         bool            is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
286
287         /*
288          * if (!is_leaf) remove old key: This node's key has been modified, either
289          * because a child split occurred or because we needed to adjust our key
290          * for an insert in a child node. Therefore, remove the old version of
291          * this node's key.
292          *
293          * for WAL replay, in the non-split case we handle this by setting up a
294          * one-element todelete array; in the split case, it's handled implicitly
295          * because the tuple vector passed to gistSplit won't include this tuple.
296          *
297          * XXX: If we want to change fillfactors between node and leaf, fillfactor
298          * = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor)
299          */
300         if (gistnospace(state->stack->page, state->itup, state->ituplen,
301                                         is_leaf ? InvalidOffsetNumber : state->stack->childoffnum,
302                                         state->freespace))
303         {
304                 /* no space for insertion */
305                 IndexTuple *itvec;
306                 int                     tlen;
307                 SplitedPageLayout *dist = NULL,
308                                    *ptr;
309                 BlockNumber rrlink = InvalidBlockNumber;
310                 GistNSN         oldnsn;
311
312                 is_splitted = true;
313
314                 /*
315                  * Form index tuples vector to split: remove old tuple if t's needed
316                  * and add new tuples to vector
317                  */
318                 itvec = gistextractpage(state->stack->page, &tlen);
319                 if (!is_leaf)
320                 {
321                         /* on inner page we should remove old tuple */
322                         int                     pos = state->stack->childoffnum - FirstOffsetNumber;
323
324                         tlen--;
325                         if (pos != tlen)
326                                 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
327                 }
328                 itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
329                 dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
330
331                 state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen);
332                 state->ituplen = 0;
333
334                 if (state->stack->blkno != GIST_ROOT_BLKNO)
335                 {
336                         /*
337                          * if non-root split then we should not allocate new buffer, but
338                          * we must create temporary page to operate
339                          */
340                         dist->buffer = state->stack->buffer;
341                         dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer));
342
343                         /* clean all flags except F_LEAF */
344                         GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
345                 }
346
347                 /* make new pages and fills them */
348                 for (ptr = dist; ptr; ptr = ptr->next)
349                 {
350                         int                     i;
351                         char       *data;
352
353                         /* get new page */
354                         if (ptr->buffer == InvalidBuffer)
355                         {
356                                 ptr->buffer = gistNewBuffer(state->r);
357                                 GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
358                                 ptr->page = BufferGetPage(ptr->buffer);
359                         }
360                         ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
361
362                         /*
363                          * fill page, we can do it because all these pages are new (ie not
364                          * linked in tree or masked by temp page
365                          */
366                         data = (char *) (ptr->list);
367                         for (i = 0; i < ptr->block.num; i++)
368                         {
369                                 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
370                                         elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
371                                 data += IndexTupleSize((IndexTuple) data);
372                         }
373
374                         /* set up ItemPointer and remember it for parent */
375                         ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
376                         state->itup[state->ituplen] = ptr->itup;
377                         state->ituplen++;
378                 }
379
380                 /* saves old rightlink */
381                 if (state->stack->blkno != GIST_ROOT_BLKNO)
382                         rrlink = GistPageGetOpaque(dist->page)->rightlink;
383
384                 START_CRIT_SECTION();
385
386                 /*
387                  * must mark buffers dirty before XLogInsert, even though we'll still
388                  * be changing their opaque fields below. set up right links.
389                  */
390                 for (ptr = dist; ptr; ptr = ptr->next)
391                 {
392                         MarkBufferDirty(ptr->buffer);
393                         GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
394                                 ptr->next->block.blkno : rrlink;
395                 }
396
397                 /* restore splitted non-root page */
398                 if (state->stack->blkno != GIST_ROOT_BLKNO)
399                 {
400                         PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
401                         dist->page = BufferGetPage(dist->buffer);
402                 }
403
404                 if (!state->r->rd_istemp)
405                 {
406                         XLogRecPtr      recptr;
407                         XLogRecData *rdata;
408
409                         rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
410                                                                    is_leaf, &(state->key), dist);
411
412                         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
413
414                         for (ptr = dist; ptr; ptr = ptr->next)
415                         {
416                                 PageSetLSN(ptr->page, recptr);
417                                 PageSetTLI(ptr->page, ThisTimeLineID);
418                         }
419                 }
420                 else
421                 {
422                         for (ptr = dist; ptr; ptr = ptr->next)
423                         {
424                                 PageSetLSN(ptr->page, GetXLogRecPtrForTemp());
425                         }
426                 }
427
428                 /* set up NSN */
429                 oldnsn = GistPageGetOpaque(dist->page)->nsn;
430                 if (state->stack->blkno == GIST_ROOT_BLKNO)
431                         /* if root split we should put initial value */
432                         oldnsn = PageGetLSN(dist->page);
433
434                 for (ptr = dist; ptr; ptr = ptr->next)
435                 {
436                         /* only for last set oldnsn */
437                         GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
438                                 PageGetLSN(ptr->page) : oldnsn;
439                 }
440
441                 /*
442                  * release buffers, if it was a root split then release all buffers
443                  * because we create all buffers
444                  */
445                 ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next;
446                 for (; ptr; ptr = ptr->next)
447                         UnlockReleaseBuffer(ptr->buffer);
448
449                 if (state->stack->blkno == GIST_ROOT_BLKNO)
450                 {
451                         gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
452                         state->needInsertComplete = false;
453                 }
454
455                 END_CRIT_SECTION();
456         }
457         else
458         {
459                 /* enough space */
460                 START_CRIT_SECTION();
461
462                 if (!is_leaf)
463                         PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
464                 gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
465
466                 MarkBufferDirty(state->stack->buffer);
467
468                 if (!state->r->rd_istemp)
469                 {
470                         OffsetNumber noffs = 0,
471                                                 offs[1];
472                         XLogRecPtr      recptr;
473                         XLogRecData *rdata;
474
475                         if (!is_leaf)
476                         {
477                                 /* only on inner page we should delete previous version */
478                                 offs[0] = state->stack->childoffnum;
479                                 noffs = 1;
480                         }
481
482                         rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
483                                                                         offs, noffs,
484                                                                         state->itup, state->ituplen,
485                                                                         &(state->key));
486
487                         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
488                         PageSetLSN(state->stack->page, recptr);
489                         PageSetTLI(state->stack->page, ThisTimeLineID);
490                 }
491                 else
492                         PageSetLSN(state->stack->page, GetXLogRecPtrForTemp());
493
494                 if (state->stack->blkno == GIST_ROOT_BLKNO)
495                         state->needInsertComplete = false;
496
497                 END_CRIT_SECTION();
498
499                 if (state->ituplen > 1)
500                 {                                               /* previous is_splitted==true */
501
502                         /*
503                          * child was splited, so we must form union for insertion in
504                          * parent
505                          */
506                         IndexTuple      newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
507
508                         ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno);
509                         state->itup[0] = newtup;
510                         state->ituplen = 1;
511                 }
512                 else if (is_leaf)
513                 {
514                         /*
515                          * itup[0] store key to adjust parent, we set it to valid to
516                          * correct check by GistTupleIsInvalid macro in gistgetadjusted()
517                          */
518                         ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno);
519                         GistTupleSetValid(state->itup[0]);
520                 }
521         }
522         return is_splitted;
523 }
524
525 /*
526  * returns stack of pages, all pages in stack are pinned, and
527  * leaf is X-locked
528  */
529
530 static void
531 gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
532 {
533         ItemId          iid;
534         IndexTuple      idxtuple;
535         GISTPageOpaque opaque;
536
537         /*
538          * walk down, We don't lock page for a long time, but so we should be
539          * ready to recheck path in a bad case... We remember, that page->lsn
540          * should never be invalid.
541          */
542         for (;;)
543         {
544                 if (XLogRecPtrIsInvalid(state->stack->lsn))
545                         state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
546                 LockBuffer(state->stack->buffer, GIST_SHARE);
547                 gistcheckpage(state->r, state->stack->buffer);
548
549                 state->stack->page = (Page) BufferGetPage(state->stack->buffer);
550                 opaque = GistPageGetOpaque(state->stack->page);
551
552                 state->stack->lsn = PageGetLSN(state->stack->page);
553                 Assert(state->r->rd_istemp || !XLogRecPtrIsInvalid(state->stack->lsn));
554
555                 if (state->stack->blkno != GIST_ROOT_BLKNO &&
556                         XLByteLT(state->stack->parent->lsn, opaque->nsn))
557                 {
558                         /*
559                          * caused split non-root page is detected, go up to parent to
560                          * choose best child
561                          */
562                         UnlockReleaseBuffer(state->stack->buffer);
563                         state->stack = state->stack->parent;
564                         continue;
565                 }
566
567                 if (!GistPageIsLeaf(state->stack->page))
568                 {
569                         /*
570                          * This is an internal page, so continue to walk down the tree. We
571                          * find the child node that has the minimum insertion penalty and
572                          * recursively invoke ourselves to modify that node. Once the
573                          * recursive call returns, we may need to adjust the parent node
574                          * for two reasons: the child node split, or the key in this node
575                          * needs to be adjusted for the newly inserted key below us.
576                          */
577                         GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
578
579                         state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
580
581                         iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
582                         idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid);
583                         item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
584                         LockBuffer(state->stack->buffer, GIST_UNLOCK);
585
586                         item->parent = state->stack;
587                         item->child = NULL;
588                         if (state->stack)
589                                 state->stack->child = item;
590                         state->stack = item;
591                 }
592                 else
593                 {
594                         /* be carefull, during unlock/lock page may be changed... */
595                         LockBuffer(state->stack->buffer, GIST_UNLOCK);
596                         LockBuffer(state->stack->buffer, GIST_EXCLUSIVE);
597                         state->stack->page = (Page) BufferGetPage(state->stack->buffer);
598                         opaque = GistPageGetOpaque(state->stack->page);
599
600                         if (state->stack->blkno == GIST_ROOT_BLKNO)
601                         {
602                                 /*
603                                  * the only page can become inner instead of leaf is a root
604                                  * page, so for root we should recheck it
605                                  */
606                                 if (!GistPageIsLeaf(state->stack->page))
607                                 {
608                                         /*
609                                          * very rarely situation: during unlock/lock index with
610                                          * number of pages = 1 was increased
611                                          */
612                                         LockBuffer(state->stack->buffer, GIST_UNLOCK);
613                                         continue;
614                                 }
615
616                                 /*
617                                  * we don't need to check root split, because checking
618                                  * leaf/inner is enough to recognize split for root
619                                  */
620
621                         }
622                         else if (XLByteLT(state->stack->parent->lsn, opaque->nsn))
623                         {
624                                 /*
625                                  * detecting split during unlock/lock, so we should find
626                                  * better child on parent
627                                  */
628
629                                 /* forget buffer */
630                                 UnlockReleaseBuffer(state->stack->buffer);
631
632                                 state->stack = state->stack->parent;
633                                 continue;
634                         }
635
636                         state->stack->lsn = PageGetLSN(state->stack->page);
637
638                         /* ok we found a leaf page and it X-locked */
639                         break;
640                 }
641         }
642
643         /* now state->stack->(page, buffer and blkno) points to leaf page */
644 }
645
646 /*
647  * Traverse the tree to find path from root page to specified "child" block.
648  *
649  * returns from the beginning of closest parent;
650  *
651  * To prevent deadlocks, this should lock only one page simultaneously.
652  */
653 GISTInsertStack *
654 gistFindPath(Relation r, BlockNumber child)
655 {
656         Page            page;
657         Buffer          buffer;
658         OffsetNumber i,
659                                 maxoff;
660         ItemId          iid;
661         IndexTuple      idxtuple;
662         GISTInsertStack *top,
663                            *tail,
664                            *ptr;
665         BlockNumber blkno;
666
667         top = tail = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
668         top->blkno = GIST_ROOT_BLKNO;
669
670         while (top && top->blkno != child)
671         {
672                 buffer = ReadBuffer(r, top->blkno);
673                 LockBuffer(buffer, GIST_SHARE);
674                 gistcheckpage(r, buffer);
675                 page = (Page) BufferGetPage(buffer);
676
677                 if (GistPageIsLeaf(page))
678                 {
679                         /* we can safety go away, follows only leaf pages */
680                         UnlockReleaseBuffer(buffer);
681                         return NULL;
682                 }
683
684                 top->lsn = PageGetLSN(page);
685
686                 if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
687                         GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
688                 {
689                         /* page splited while we thinking of... */
690                         ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
691                         ptr->blkno = GistPageGetOpaque(page)->rightlink;
692                         ptr->childoffnum = InvalidOffsetNumber;
693                         ptr->parent = top;
694                         ptr->next = NULL;
695                         tail->next = ptr;
696                         tail = ptr;
697                 }
698
699                 maxoff = PageGetMaxOffsetNumber(page);
700
701                 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
702                 {
703                         iid = PageGetItemId(page, i);
704                         idxtuple = (IndexTuple) PageGetItem(page, iid);
705                         blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
706                         if (blkno == child)
707                         {
708                                 OffsetNumber poff = InvalidOffsetNumber;
709
710                                 /* make childs links */
711                                 ptr = top;
712                                 while (ptr->parent)
713                                 {
714                                         /* set child link */
715                                         ptr->parent->child = ptr;
716                                         /* move childoffnum.. */
717                                         if (ptr == top)
718                                         {
719                                                 /* first iteration */
720                                                 poff = ptr->parent->childoffnum;
721                                                 ptr->parent->childoffnum = ptr->childoffnum;
722                                         }
723                                         else
724                                         {
725                                                 OffsetNumber tmp = ptr->parent->childoffnum;
726
727                                                 ptr->parent->childoffnum = poff;
728                                                 poff = tmp;
729                                         }
730                                         ptr = ptr->parent;
731                                 }
732                                 top->childoffnum = i;
733                                 UnlockReleaseBuffer(buffer);
734                                 return top;
735                         }
736                         else
737                         {
738                                 /* Install next inner page to the end of stack */
739                                 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
740                                 ptr->blkno = blkno;
741                                 ptr->childoffnum = i;   /* set offsetnumber of child to child
742                                                                                  * !!! */
743                                 ptr->parent = top;
744                                 ptr->next = NULL;
745                                 tail->next = ptr;
746                                 tail = ptr;
747                         }
748                 }
749
750                 UnlockReleaseBuffer(buffer);
751                 top = top->next;
752         }
753
754         return NULL;
755 }
756
757
758 /*
759  * Returns X-locked parent of stack page
760  */
761
762 static void
763 gistFindCorrectParent(Relation r, GISTInsertStack *child)
764 {
765         GISTInsertStack *parent = child->parent;
766
767         LockBuffer(parent->buffer, GIST_EXCLUSIVE);
768         gistcheckpage(r, parent->buffer);
769         parent->page = (Page) BufferGetPage(parent->buffer);
770
771         /* here we don't need to distinguish between split and page update */
772         if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page)))
773         {
774                 /* parent is changed, look child in right links until found */
775                 OffsetNumber i,
776                                         maxoff;
777                 ItemId          iid;
778                 IndexTuple      idxtuple;
779                 GISTInsertStack *ptr;
780
781                 while (true)
782                 {
783                         maxoff = PageGetMaxOffsetNumber(parent->page);
784                         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
785                         {
786                                 iid = PageGetItemId(parent->page, i);
787                                 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
788                                 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
789                                 {
790                                         /* yes!!, found */
791                                         parent->childoffnum = i;
792                                         return;
793                                 }
794                         }
795
796                         parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
797                         UnlockReleaseBuffer(parent->buffer);
798                         if (parent->blkno == InvalidBlockNumber)
799
800                                 /*
801                                  * end of chain and still didn't found parent, It's very-very
802                                  * rare situation when root splited
803                                  */
804                                 break;
805                         parent->buffer = ReadBuffer(r, parent->blkno);
806                         LockBuffer(parent->buffer, GIST_EXCLUSIVE);
807                         gistcheckpage(r, parent->buffer);
808                         parent->page = (Page) BufferGetPage(parent->buffer);
809                 }
810
811                 /*
812                  * awful!!, we need search tree to find parent ... , but before we
813                  * should release all old parent
814                  */
815
816                 ptr = child->parent->parent;    /* child->parent already released
817                                                                                  * above */
818                 while (ptr)
819                 {
820                         ReleaseBuffer(ptr->buffer);
821                         ptr = ptr->parent;
822                 }
823
824                 /* ok, find new path */
825                 ptr = parent = gistFindPath(r, child->blkno);
826                 Assert(ptr != NULL);
827
828                 /* read all buffers as expected by caller */
829                 /* note we don't lock them or gistcheckpage them here! */
830                 while (ptr)
831                 {
832                         ptr->buffer = ReadBuffer(r, ptr->blkno);
833                         ptr->page = (Page) BufferGetPage(ptr->buffer);
834                         ptr = ptr->parent;
835                 }
836
837                 /* install new chain of parents to stack */
838                 child->parent = parent;
839                 parent->child = child;
840
841                 /* make recursive call to normal processing */
842                 gistFindCorrectParent(r, child);
843         }
844
845         return;
846 }
847
848 void
849 gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
850 {
851         int                     is_splitted;
852         ItemId          iid;
853         IndexTuple      oldtup,
854                                 newtup;
855
856         /* walk up */
857         while (true)
858         {
859                 /*
860                  * After this call: 1. if child page was splited, then itup contains
861                  * keys for each page 2. if  child page wasn't splited, then itup
862                  * contains additional for adjustment of current key
863                  */
864
865                 if (state->stack->parent)
866                 {
867                         /*
868                          * X-lock parent page before proceed child, gistFindCorrectParent
869                          * should find and lock it
870                          */
871                         gistFindCorrectParent(state->r, state->stack);
872                 }
873                 is_splitted = gistplacetopage(state, giststate);
874
875                 /* parent locked above, so release child buffer */
876                 UnlockReleaseBuffer(state->stack->buffer);
877
878                 /* pop parent page from stack */
879                 state->stack = state->stack->parent;
880
881                 /* stack is void */
882                 if (!state->stack)
883                         break;
884
885                 /*
886                  * child did not split, so we can check is it needed to update parent
887                  * tuple
888                  */
889                 if (!is_splitted)
890                 {
891                         /* parent's tuple */
892                         iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
893                         oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
894                         newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
895
896                         if (!newtup)
897                         {                                       /* not need to update key */
898                                 LockBuffer(state->stack->buffer, GIST_UNLOCK);
899                                 break;
900                         }
901
902                         state->itup[0] = newtup;
903                 }
904         }                                                       /* while */
905
906         /* release all parent buffers */
907         while (state->stack)
908         {
909                 ReleaseBuffer(state->stack->buffer);
910                 state->stack = state->stack->parent;
911         }
912
913         /* say to xlog that insert is completed */
914         if (state->needInsertComplete && !state->r->rd_istemp)
915                 gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
916 }
917
918 /*
919  * gistSplit -- split a page in the tree and fill struct
920  * used for XLOG and real writes buffers. Function is recursive, ie
921  * it will split page until keys will fit in every page.
922  */
923 SplitedPageLayout *
924 gistSplit(Relation r,
925                   Page page,
926                   IndexTuple *itup,             /* contains compressed entry */
927                   int len,
928                   GISTSTATE *giststate)
929 {
930         IndexTuple *lvectup,
931                            *rvectup;
932         GistSplitVector v;
933         GistEntryVector *entryvec;
934         int                     i;
935         SplitedPageLayout *res = NULL;
936
937         /* generate the item array */
938         entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
939         entryvec->n = len + 1;
940
941         memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
942         memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
943         gistSplitByKey(r, page, itup, len, giststate,
944                                    &v, entryvec, 0);
945
946         /* form left and right vector */
947         lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
948         rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
949
950         for (i = 0; i < v.splitVector.spl_nleft; i++)
951                 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
952
953         for (i = 0; i < v.splitVector.spl_nright; i++)
954                 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
955
956         /* finalize splitting (may need another split) */
957         if (!gistfitpage(rvectup, v.splitVector.spl_nright))
958         {
959                 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
960         }
961         else
962         {
963                 ROTATEDIST(res);
964                 res->block.num = v.splitVector.spl_nright;
965                 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
966                 res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false)
967                         : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
968         }
969
970         if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
971         {
972                 SplitedPageLayout *resptr,
973                                    *subres;
974
975                 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
976
977                 /* install on list's tail */
978                 while (resptr->next)
979                         resptr = resptr->next;
980
981                 resptr->next = res;
982                 res = subres;
983         }
984         else
985         {
986                 ROTATEDIST(res);
987                 res->block.num = v.splitVector.spl_nleft;
988                 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
989                 res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false)
990                         : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
991         }
992
993         return res;
994 }
995
996 /*
997  * buffer must be pinned and locked by caller
998  */
999 void
1000 gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key)
1001 {
1002         Page            page;
1003
1004         Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
1005         page = BufferGetPage(buffer);
1006
1007         START_CRIT_SECTION();
1008
1009         GISTInitBuffer(buffer, 0);
1010         gistfillbuffer(page, itup, len, FirstOffsetNumber);
1011
1012         MarkBufferDirty(buffer);
1013
1014         if (!r->rd_istemp)
1015         {
1016                 XLogRecPtr      recptr;
1017                 XLogRecData *rdata;
1018
1019                 rdata = formUpdateRdata(r->rd_node, buffer,
1020                                                                 NULL, 0,
1021                                                                 itup, len, key);
1022
1023                 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
1024                 PageSetLSN(page, recptr);
1025                 PageSetTLI(page, ThisTimeLineID);
1026         }
1027         else
1028                 PageSetLSN(page, GetXLogRecPtrForTemp());
1029
1030         END_CRIT_SECTION();
1031 }
1032
1033 void
1034 initGISTstate(GISTSTATE *giststate, Relation index)
1035 {
1036         int                     i;
1037
1038         if (index->rd_att->natts > INDEX_MAX_KEYS)
1039                 elog(ERROR, "numberOfAttributes %d > %d",
1040                          index->rd_att->natts, INDEX_MAX_KEYS);
1041
1042         giststate->tupdesc = index->rd_att;
1043
1044         for (i = 0; i < index->rd_att->natts; i++)
1045         {
1046                 fmgr_info_copy(&(giststate->consistentFn[i]),
1047                                            index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1048                                            CurrentMemoryContext);
1049                 fmgr_info_copy(&(giststate->unionFn[i]),
1050                                            index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1051                                            CurrentMemoryContext);
1052                 fmgr_info_copy(&(giststate->compressFn[i]),
1053                                            index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1054                                            CurrentMemoryContext);
1055                 fmgr_info_copy(&(giststate->decompressFn[i]),
1056                                            index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1057                                            CurrentMemoryContext);
1058                 fmgr_info_copy(&(giststate->penaltyFn[i]),
1059                                            index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1060                                            CurrentMemoryContext);
1061                 fmgr_info_copy(&(giststate->picksplitFn[i]),
1062                                            index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1063                                            CurrentMemoryContext);
1064                 fmgr_info_copy(&(giststate->equalFn[i]),
1065                                            index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1066                                            CurrentMemoryContext);
1067         }
1068 }
1069
1070 void
1071 freeGISTstate(GISTSTATE *giststate)
1072 {
1073         /* no work */
1074 }