]> granicus.if.org Git - postgresql/blob - src/backend/access/spgist/spgdoinsert.c
Rename updateNodeLink to spgUpdateNodeLink.
[postgresql] / src / backend / access / spgist / spgdoinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * spgdoinsert.c
4  *        implementation of insert algorithm
5  *
6  *
7  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/spgist/spgdoinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "miscadmin.h"
21 #include "storage/bufmgr.h"
22
23
24 /*
25  * SPPageDesc tracks all info about a page we are inserting into.  In some
26  * situations it actually identifies a tuple, or even a specific node within
27  * an inner tuple.  But any of the fields can be invalid.  If the buffer
28  * field is valid, it implies we hold pin and exclusive lock on that buffer.
29  * page pointer should be valid exactly when buffer is.
30  */
31 typedef struct SPPageDesc
32 {
33         BlockNumber blkno;                      /* block number, or InvalidBlockNumber */
34         Buffer          buffer;                 /* page's buffer number, or InvalidBuffer */
35         Page            page;                   /* pointer to page buffer, or NULL */
36         OffsetNumber offnum;            /* offset of tuple, or InvalidOffsetNumber */
37         int                     node;                   /* node number within inner tuple, or -1 */
38 } SPPageDesc;
39
40
41 /*
42  * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
43  * is used to update the parent inner tuple's downlink after a move or
44  * split operation.
45  */
46 void
47 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
48                                   BlockNumber blkno, OffsetNumber offset)
49 {
50         int                     i;
51         SpGistNodeTuple node;
52
53         SGITITERATE(tup, i, node)
54         {
55                 if (i == nodeN)
56                 {
57                         ItemPointerSet(&node->t_tid, blkno, offset);
58                         return;
59                 }
60         }
61
62         elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
63                  nodeN);
64 }
65
66 /*
67  * Form a new inner tuple containing one more node than the given one, with
68  * the specified label datum, inserted at offset "offset" in the node array.
69  * The new tuple's prefix is the same as the old one's.
70  *
71  * Note that the new node initially has an invalid downlink.  We'll find a
72  * page to point it to later.
73  */
74 static SpGistInnerTuple
75 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
76 {
77         SpGistNodeTuple node,
78                            *nodes;
79         int                     i;
80
81         /* if offset is negative, insert at end */
82         if (offset < 0)
83                 offset = tuple->nNodes;
84         else if (offset > tuple->nNodes)
85                 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
86
87         nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
88         SGITITERATE(tuple, i, node)
89         {
90                 if (i < offset)
91                         nodes[i] = node;
92                 else
93                         nodes[i + 1] = node;
94         }
95
96         nodes[offset] = spgFormNodeTuple(state, label, false);
97
98         return spgFormInnerTuple(state,
99                                                          (tuple->prefixSize > 0),
100                                                          SGITDATUM(tuple, state),
101                                                          tuple->nNodes + 1,
102                                                          nodes);
103 }
104
105 /* qsort comparator for sorting OffsetNumbers */
106 static int
107 cmpOffsetNumbers(const void *a, const void *b)
108 {
109         if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
110                 return 0;
111         return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
112 }
113
114 /*
115  * Delete multiple tuples from an index page, preserving tuple offset numbers.
116  *
117  * The first tuple in the given list is replaced with a dead tuple of type
118  * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
119  * with dead tuples of type "reststate".  If either firststate or reststate
120  * is REDIRECT, blkno/offnum specify where to link to.
121  *
122  * NB: this is used during WAL replay, so beware of trying to make it too
123  * smart.  In particular, it shouldn't use "state" except for calling
124  * spgFormDeadTuple().
125  */
126 void
127 spgPageIndexMultiDelete(SpGistState *state, Page page,
128                                                 OffsetNumber *itemnos, int nitems,
129                                                 int firststate, int reststate,
130                                                 BlockNumber blkno, OffsetNumber offnum)
131 {
132         OffsetNumber    firstItem;
133         OffsetNumber   *sortednos;
134         SpGistDeadTuple tuple = NULL;
135         int                     i;
136
137         if (nitems == 0)
138                 return;                                 /* nothing to do */
139
140         /*
141          * For efficiency we want to use PageIndexMultiDelete, which requires the
142          * targets to be listed in sorted order, so we have to sort the itemnos
143          * array.  (This also greatly simplifies the math for reinserting the
144          * replacement tuples.)  However, we must not scribble on the caller's
145          * array, so we have to make a copy.
146          */
147         sortednos = (OffsetNumber *) palloc(sizeof(OffsetNumber) * nitems);
148         memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
149         if (nitems > 1)
150                 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
151
152         PageIndexMultiDelete(page, sortednos, nitems);
153
154         firstItem = itemnos[0];
155
156         for (i = 0; i < nitems; i++)
157         {
158                 OffsetNumber    itemno = sortednos[i];
159                 int                             tupstate;
160
161                 tupstate = (itemno == firstItem) ? firststate : reststate;
162                 if (tuple == NULL || tuple->tupstate != tupstate)
163                         tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
164
165                 if (PageAddItem(page, (Item) tuple, tuple->size,
166                                                 itemno, false, false) != itemno)
167                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
168                                  tuple->size);
169
170                 if (tupstate == SPGIST_REDIRECT)
171                         SpGistPageGetOpaque(page)->nRedirection++;
172                 else if (tupstate == SPGIST_PLACEHOLDER)
173                         SpGistPageGetOpaque(page)->nPlaceholder++;
174         }
175
176         pfree(sortednos);
177 }
178
179 /*
180  * Update the parent inner tuple's downlink, and mark the parent buffer
181  * dirty (this must be the last change to the parent page in the current
182  * WAL action).
183  */
184 static void
185 saveNodeLink(Relation index, SPPageDesc *parent,
186                          BlockNumber blkno, OffsetNumber offnum)
187 {
188         SpGistInnerTuple innerTuple;
189
190         innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
191                                                                 PageGetItemId(parent->page, parent->offnum));
192
193         spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
194
195         MarkBufferDirty(parent->buffer);
196 }
197
198 /*
199  * Add a leaf tuple to a leaf page where there is known to be room for it
200  */
201 static void
202 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
203                          SPPageDesc *current, SPPageDesc *parent, bool isNew)
204 {
205         XLogRecData rdata[4];
206         spgxlogAddLeaf xlrec;
207
208         xlrec.node = index->rd_node;
209         xlrec.blknoLeaf = current->blkno;
210         xlrec.newPage = isNew;
211
212         /* these will be filled below as needed */
213         xlrec.offnumLeaf = InvalidOffsetNumber;
214         xlrec.offnumHeadLeaf = InvalidOffsetNumber;
215         xlrec.blknoParent = InvalidBlockNumber;
216         xlrec.offnumParent = InvalidOffsetNumber;
217         xlrec.nodeI = 0;
218
219         ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
220         /* we assume sizeof(xlrec) is at least int-aligned */
221         ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1);
222         ACCEPT_RDATA_BUFFER(current->buffer, 2);
223
224         START_CRIT_SECTION();
225
226         if (current->offnum == InvalidOffsetNumber ||
227                 current->blkno == SPGIST_HEAD_BLKNO)
228         {
229                 /* Tuple is not part of a chain */
230                 leafTuple->nextOffset = InvalidOffsetNumber;
231                 current->offnum = SpGistPageAddNewItem(state, current->page,
232                                                                                            (Item) leafTuple, leafTuple->size,
233                                                                                            NULL, false);
234
235                 xlrec.offnumLeaf = current->offnum;
236
237                 /* Must update parent's downlink if any */
238                 if (parent->buffer != InvalidBuffer)
239                 {
240                         xlrec.blknoParent = parent->blkno;
241                         xlrec.offnumParent = parent->offnum;
242                         xlrec.nodeI = parent->node;
243
244                         saveNodeLink(index, parent, current->blkno, current->offnum);
245
246                         ACCEPT_RDATA_BUFFER(parent->buffer, 3);
247                 }
248         }
249         else
250         {
251                 /*
252                  * Tuple must be inserted into existing chain.  We mustn't change
253                  * the chain's head address, but we don't need to chase the entire
254                  * chain to put the tuple at the end; we can insert it second.
255                  *
256                  * Also, it's possible that the "chain" consists only of a DEAD tuple,
257                  * in which case we should replace the DEAD tuple in-place.
258                  */
259                 SpGistLeafTuple head;
260                 OffsetNumber offnum;
261
262                 head = (SpGistLeafTuple) PageGetItem(current->page,
263                                                                                          PageGetItemId(current->page, current->offnum));
264                 if (head->tupstate == SPGIST_LIVE)
265                 {
266                         leafTuple->nextOffset = head->nextOffset;
267                         offnum = SpGistPageAddNewItem(state, current->page,
268                                                                                   (Item) leafTuple, leafTuple->size,
269                                                                                   NULL, false);
270
271                         /*
272                          * re-get head of list because it could have been moved on page,
273                          * and set new second element
274                          */
275                         head = (SpGistLeafTuple) PageGetItem(current->page,
276                                                                                          PageGetItemId(current->page, current->offnum));
277                         head->nextOffset = offnum;
278
279                         xlrec.offnumLeaf = offnum;
280                         xlrec.offnumHeadLeaf = current->offnum;
281                 }
282                 else if (head->tupstate == SPGIST_DEAD)
283                 {
284                         leafTuple->nextOffset = InvalidOffsetNumber;
285                         PageIndexTupleDelete(current->page, current->offnum);
286                         if (PageAddItem(current->page,
287                                                         (Item) leafTuple, leafTuple->size,
288                                                         current->offnum, false, false) != current->offnum)
289                                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
290                                          leafTuple->size);
291
292                         /* WAL replay distinguishes this case by equal offnums */
293                         xlrec.offnumLeaf = current->offnum;
294                         xlrec.offnumHeadLeaf = current->offnum;
295                 }
296                 else
297                         elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
298         }
299
300         MarkBufferDirty(current->buffer);
301
302         if (RelationNeedsWAL(index))
303         {
304                 XLogRecPtr      recptr;
305
306                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
307
308                 PageSetLSN(current->page, recptr);
309                 PageSetTLI(current->page, ThisTimeLineID);
310
311                 /* update parent only if we actually changed it */
312                 if (xlrec.blknoParent != InvalidBlockNumber)
313                 {
314                         PageSetLSN(parent->page, recptr);
315                         PageSetTLI(parent->page, ThisTimeLineID);
316                 }
317         }
318
319         END_CRIT_SECTION();
320 }
321
322 /*
323  * Count the number and total size of leaf tuples in the chain starting at
324  * current->offnum.  Return number into *nToSplit and total size as function
325  * result.
326  *
327  * Klugy special case when considering the root page (i.e., root is a leaf
328  * page, but we're about to split for the first time): return fake large
329  * values to force spgdoinsert() to take the doPickSplit rather than
330  * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
331  */
332 static int
333 checkSplitConditions(Relation index, SpGistState *state,
334                                          SPPageDesc *current, int *nToSplit)
335 {
336         int                     i,
337                                 n = 0,
338                                 totalSize = 0;
339
340         if (current->blkno == SPGIST_HEAD_BLKNO)
341         {
342                 /* return impossible values to force split */
343                 *nToSplit = BLCKSZ;
344                 return BLCKSZ;
345         }
346
347         i = current->offnum;
348         while (i != InvalidOffsetNumber)
349         {
350                 SpGistLeafTuple it;
351
352                 Assert(i >= FirstOffsetNumber &&
353                            i <= PageGetMaxOffsetNumber(current->page));
354                 it = (SpGistLeafTuple) PageGetItem(current->page,
355                                                                                    PageGetItemId(current->page, i));
356                 if (it->tupstate == SPGIST_LIVE)
357                 {
358                         n++;
359                         totalSize += it->size + sizeof(ItemIdData);
360                 }
361                 else if (it->tupstate == SPGIST_DEAD)
362                 {
363                         /* We could see a DEAD tuple as first/only chain item */
364                         Assert(i == current->offnum);
365                         Assert(it->nextOffset == InvalidOffsetNumber);
366                         /* Don't count it in result, because it won't go to other page */
367                 }
368                 else
369                         elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370
371                 i = it->nextOffset;
372         }
373
374         *nToSplit = n;
375
376         return totalSize;
377 }
378
379 /*
380  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381  * but the chain has to be moved because there's not enough room to add
382  * newLeafTuple to its page.  We use this method when the chain contains
383  * very little data so a split would be inefficient.  We are sure we can
384  * fit the chain plus newLeafTuple on one other page.
385  */
386 static void
387 moveLeafs(Relation index, SpGistState *state,
388                   SPPageDesc *current, SPPageDesc *parent,
389                   SpGistLeafTuple newLeafTuple)
390 {
391         int                     i,
392                                 nDelete,
393                                 nInsert,
394                                 size;
395         Buffer          nbuf;
396         Page            npage;
397         SpGistLeafTuple it;
398         OffsetNumber r = InvalidOffsetNumber,
399                                 startOffset = InvalidOffsetNumber;
400         bool            replaceDead = false;
401         OffsetNumber *toDelete;
402         OffsetNumber *toInsert;
403         BlockNumber nblkno;
404         XLogRecData rdata[7];
405         spgxlogMoveLeafs xlrec;
406         char       *leafdata,
407                            *leafptr;
408
409         /* This doesn't work on root page */
410         Assert(parent->buffer != InvalidBuffer);
411         Assert(parent->buffer != current->buffer);
412
413         /* Locate the tuples to be moved, and count up the space needed */
414         i = PageGetMaxOffsetNumber(current->page);
415         toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
416         toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
417
418         size = newLeafTuple->size + sizeof(ItemIdData);
419
420         nDelete = 0;
421         i = current->offnum;
422         while (i != InvalidOffsetNumber)
423         {
424                 SpGistLeafTuple it;
425
426                 Assert(i >= FirstOffsetNumber &&
427                            i <= PageGetMaxOffsetNumber(current->page));
428                 it = (SpGistLeafTuple) PageGetItem(current->page,
429                                                                                    PageGetItemId(current->page, i));
430
431                 if (it->tupstate == SPGIST_LIVE)
432                 {
433                         toDelete[nDelete] = i;
434                         size += it->size + sizeof(ItemIdData);
435                         nDelete++;
436                 }
437                 else if (it->tupstate == SPGIST_DEAD)
438                 {
439                         /* We could see a DEAD tuple as first/only chain item */
440                         Assert(i == current->offnum);
441                         Assert(it->nextOffset == InvalidOffsetNumber);
442                         /* We don't want to move it, so don't count it in size */
443                         toDelete[nDelete] = i;
444                         nDelete++;
445                         replaceDead = true;
446                 }
447                 else
448                         elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
449
450                 i = it->nextOffset;
451         }
452
453         /* Find a leaf page that will hold them */
454         nbuf = SpGistGetBuffer(index, GBUF_LEAF, size, &xlrec.newPage);
455         npage = BufferGetPage(nbuf);
456         nblkno = BufferGetBlockNumber(nbuf);
457         Assert(nblkno != current->blkno);
458
459         /* prepare WAL info */
460         xlrec.node = index->rd_node;
461         STORE_STATE(state, xlrec.stateSrc);
462
463         xlrec.blknoSrc = current->blkno;
464         xlrec.blknoDst = nblkno;
465         xlrec.nMoves = nDelete;
466         xlrec.replaceDead = replaceDead;
467
468         xlrec.blknoParent = parent->blkno;
469         xlrec.offnumParent = parent->offnum;
470         xlrec.nodeI = parent->node;
471
472         leafdata = leafptr = palloc(size);
473
474         START_CRIT_SECTION();
475
476         /* copy all the old tuples to new page, unless they're dead */
477         nInsert = 0;
478         if (!replaceDead)
479         {
480                 for (i = 0; i < nDelete; i++)
481                 {
482                         it = (SpGistLeafTuple) PageGetItem(current->page,
483                                                                                            PageGetItemId(current->page, toDelete[i]));
484                         Assert(it->tupstate == SPGIST_LIVE);
485
486                         /*
487                          * Update chain link (notice the chain order gets reversed, but we
488                          * don't care).  We're modifying the tuple on the source page
489                          * here, but it's okay since we're about to delete it.
490                          */
491                         it->nextOffset = r;
492
493                         r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
494                                                                          &startOffset, false);
495
496                         toInsert[nInsert] = r;
497                         nInsert++;
498
499                         /* save modified tuple into leafdata as well */
500                         memcpy(leafptr, it, it->size);
501                         leafptr += it->size;
502                 }
503         }
504
505         /* add the new tuple as well */
506         newLeafTuple->nextOffset = r;
507         r = SpGistPageAddNewItem(state, npage,
508                                                          (Item) newLeafTuple, newLeafTuple->size,
509                                                          &startOffset, false);
510         toInsert[nInsert] = r;
511         nInsert++;
512         memcpy(leafptr, newLeafTuple, newLeafTuple->size);
513         leafptr += newLeafTuple->size;
514
515         /*
516          * Now delete the old tuples, leaving a redirection pointer behind for
517          * the first one, unless we're doing an index build; in which case there
518          * can't be any concurrent scan so we need not provide a redirect.
519          */
520         spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
521                                                         state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
522                                                         SPGIST_PLACEHOLDER,
523                                                         nblkno, r);
524
525         /* Update parent's downlink and mark parent page dirty */
526         saveNodeLink(index, parent, nblkno, r);
527
528         /* Mark the leaf pages too */
529         MarkBufferDirty(current->buffer);
530         MarkBufferDirty(nbuf);
531
532         if (RelationNeedsWAL(index))
533         {
534                 XLogRecPtr      recptr;
535
536                 ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
537                 ACCEPT_RDATA_DATA(toDelete, MAXALIGN(sizeof(OffsetNumber) * nDelete), 1);
538                 ACCEPT_RDATA_DATA(toInsert, MAXALIGN(sizeof(OffsetNumber) * nInsert), 2);
539                 ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3);
540                 ACCEPT_RDATA_BUFFER(current->buffer, 4);
541                 ACCEPT_RDATA_BUFFER(nbuf, 5);
542                 ACCEPT_RDATA_BUFFER(parent->buffer, 6);
543
544                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
545
546                 PageSetLSN(current->page, recptr);
547                 PageSetTLI(current->page, ThisTimeLineID);
548                 PageSetLSN(npage, recptr);
549                 PageSetTLI(npage, ThisTimeLineID);
550                 PageSetLSN(parent->page, recptr);
551                 PageSetTLI(parent->page, ThisTimeLineID);
552         }
553
554         END_CRIT_SECTION();
555
556         /* Update local free-space cache and release new buffer */
557         SpGistSetLastUsedPage(index, nbuf);
558         UnlockReleaseBuffer(nbuf);
559 }
560
561 /*
562  * Update previously-created redirection tuple with appropriate destination
563  *
564  * We use this when it's not convenient to know the destination first.
565  * The tuple should have been made with the "impossible" destination of
566  * the metapage.
567  */
568 static void
569 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
570                                         BlockNumber blkno, OffsetNumber offnum)
571 {
572         SpGistDeadTuple dt;
573
574         dt = (SpGistDeadTuple) PageGetItem(current->page,
575                                                                            PageGetItemId(current->page, position));
576         Assert(dt->tupstate == SPGIST_REDIRECT);
577         Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
578         ItemPointerSet(&dt->pointer, blkno, offnum);
579 }
580
581 /*
582  * Test to see if the user-defined picksplit function failed to do its job,
583  * ie, it put all the leaf tuples into the same node.
584  * If so, randomly divide the tuples into several nodes (all with the same
585  * label) and return TRUE to select allTheSame mode for this inner tuple.
586  *
587  * If we know that the leaf tuples wouldn't all fit on one page, then we
588  * exclude the last tuple (which is the incoming new tuple that forced a split)
589  * from the check to see if more than one node is used.  The reason for this
590  * is that if the existing tuples are put into only one chain, then even if
591  * we move them all to an empty page, there would still not be room for the
592  * new tuple, so we'd get into an infinite loop of picksplit attempts.
593  * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
594  * be split across pages.  (Exercise for the reader: figure out why this
595  * fixes the problem even when there is only one old tuple.)
596  */
597 static bool
598 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
599                                 bool *includeNew)
600 {
601         int                     theNode;
602         int                     limit;
603         int                     i;
604
605         /* For the moment, assume we can include the new leaf tuple */
606         *includeNew = true;
607
608         /* If there's only the new leaf tuple, don't select allTheSame mode */
609         if (in->nTuples <= 1)
610                 return false;
611
612         /* If tuple set doesn't fit on one page, ignore the new tuple in test */
613         limit = tooBig ? in->nTuples - 1 : in->nTuples;
614
615         /* Check to see if more than one node is populated */
616         theNode = out->mapTuplesToNodes[0];
617         for (i = 1; i < limit; i++)
618         {
619                 if (out->mapTuplesToNodes[i] != theNode)
620                         return false;
621         }
622
623         /* Nope, so override the picksplit function's decisions */
624
625         /* If the new tuple is in its own node, it can't be included in split */
626         if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
627                 *includeNew = false;
628
629         out->nNodes = 8;                        /* arbitrary number of child nodes */
630
631         /* Random assignment of tuples to nodes (note we include new tuple) */
632         for (i = 0; i < in->nTuples; i++)
633                 out->mapTuplesToNodes[i] = i % out->nNodes;
634
635         /* The opclass may not use node labels, but if it does, duplicate 'em */
636         if (out->nodeLabels)
637         {
638                 Datum   theLabel = out->nodeLabels[theNode];
639
640                 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
641                 for (i = 0; i < out->nNodes; i++)
642                         out->nodeLabels[i] = theLabel;
643         }
644
645         /* We don't touch the prefix or the leaf tuple datum assignments */
646
647         return true;
648 }
649
650 /*
651  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
652  * but the chain has to be split because there's not enough room to add
653  * newLeafTuple to its page.
654  *
655  * This function splits the leaf tuple set according to picksplit's rules,
656  * creating one or more new chains that are spread across the current page
657  * and an additional leaf page (we assume that two leaf pages will be
658  * sufficient).  A new inner tuple is created, and the parent downlink
659  * pointer is updated to point to that inner tuple instead of the leaf chain.
660  *
661  * On exit, current contains the address of the new inner tuple.
662  *
663  * Returns true if we successfully inserted newLeafTuple during this function,
664  * false if caller still has to do it (meaning another picksplit operation is
665  * probably needed).  Failure could occur if the picksplit result is fairly
666  * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
667  * Because we force the picksplit result to be at least two chains, each
668  * cycle will get rid of at least one leaf tuple from the chain, so the loop
669  * will eventually terminate if lack of balance is the issue.  If the tuple
670  * is too big, we assume that repeated picksplit operations will eventually
671  * make it small enough by repeated prefix-stripping.  A broken opclass could
672  * make this an infinite loop, though.
673  */
674 static bool
675 doPickSplit(Relation index, SpGistState *state,
676                         SPPageDesc *current, SPPageDesc *parent,
677                         SpGistLeafTuple newLeafTuple, int level, bool isNew)
678 {
679         bool            insertedNew = false;
680         spgPickSplitIn in;
681         spgPickSplitOut out;
682         FmgrInfo   *procinfo;
683         bool            includeNew;
684         int                     i,
685                                 max,
686                                 n;
687         SpGistInnerTuple innerTuple;
688         SpGistNodeTuple node,
689                            *nodes;
690         Buffer          newInnerBuffer,
691                                 newLeafBuffer;
692         ItemPointerData *heapPtrs;
693         uint8      *leafPageSelect;
694         int                *leafSizes;
695         OffsetNumber *toDelete;
696         OffsetNumber *toInsert;
697         OffsetNumber redirectTuplePos = InvalidOffsetNumber;
698         OffsetNumber startOffsets[2];
699         SpGistLeafTuple *newLeafs;
700         int                     spaceToDelete;
701         int                     currentFreeSpace;
702         int                     totalLeafSizes;
703         bool            allTheSame;
704         XLogRecData rdata[10];
705         int                     nRdata;
706         spgxlogPickSplit xlrec;
707         char       *leafdata,
708                            *leafptr;
709         SPPageDesc      saveCurrent;
710         int                     nToDelete,
711                                 nToInsert,
712                                 maxToInclude;
713
714         in.level = level;
715
716         /*
717          * Allocate per-leaf-tuple work arrays with max possible size
718          */
719         max = PageGetMaxOffsetNumber(current->page);
720         n = max + 1;
721         in.datums = (Datum *) palloc(sizeof(Datum) * n);
722         heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
723         toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
724         toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
725         newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
726         leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
727
728         xlrec.node = index->rd_node;
729         STORE_STATE(state, xlrec.stateSrc);
730
731         /*
732          * Form list of leaf tuples which will be distributed as split result;
733          * also, count up the amount of space that will be freed from current.
734          * (Note that in the non-root case, we won't actually delete the old
735          * tuples, only replace them with redirects or placeholders.)
736          */
737         nToInsert = 0;
738         nToDelete = 0;
739         spaceToDelete = 0;
740         if (current->blkno == SPGIST_HEAD_BLKNO)
741         {
742                 /*
743                  * We are splitting the root (which up to now is also a leaf page).
744                  * Its tuples are not linked, so scan sequentially to get them all.
745                  * We ignore the original value of current->offnum.
746                  */
747                 for (i = FirstOffsetNumber; i <= max; i++)
748                 {
749                         SpGistLeafTuple it;
750
751                         it = (SpGistLeafTuple) PageGetItem(current->page,
752                                                                                         PageGetItemId(current->page, i));
753                         if (it->tupstate == SPGIST_LIVE)
754                         {
755                                 in.datums[nToInsert] = SGLTDATUM(it, state);
756                                 heapPtrs[nToInsert] = it->heapPtr;
757                                 nToInsert++;
758                                 toDelete[nToDelete] = i;
759                                 nToDelete++;
760                                 /* we will delete the tuple altogether, so count full space */
761                                 spaceToDelete += it->size + sizeof(ItemIdData);
762                         }
763                         else                            /* tuples on root should be live */
764                                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
765                 }
766         }
767         else
768         {
769                 /* Normal case, just collect the leaf tuples in the chain */
770                 i = current->offnum;
771                 while (i != InvalidOffsetNumber)
772                 {
773                         SpGistLeafTuple it;
774
775                         Assert(i >= FirstOffsetNumber && i <= max);
776                         it = (SpGistLeafTuple) PageGetItem(current->page,
777                                                                                         PageGetItemId(current->page, i));
778                         if (it->tupstate == SPGIST_LIVE)
779                         {
780                                 in.datums[nToInsert] = SGLTDATUM(it, state);
781                                 heapPtrs[nToInsert] = it->heapPtr;
782                                 nToInsert++;
783                                 toDelete[nToDelete] = i;
784                                 nToDelete++;
785                                 /* we will not delete the tuple, only replace with dead */
786                                 Assert(it->size >= SGDTSIZE);
787                                 spaceToDelete += it->size - SGDTSIZE;
788                         }
789                         else if (it->tupstate == SPGIST_DEAD)
790                         {
791                                 /* We could see a DEAD tuple as first/only chain item */
792                                 Assert(i == current->offnum);
793                                 Assert(it->nextOffset == InvalidOffsetNumber);
794                                 toDelete[nToDelete] = i;
795                                 nToDelete++;
796                                 /* replacing it with redirect will save no space */
797                         }
798                         else
799                                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
800
801                         i = it->nextOffset;
802                 }
803         }
804         in.nTuples = nToInsert;
805
806         /*
807          * We may not actually insert new tuple because another picksplit may be
808          * necessary due to too large value, but we will try to to allocate enough
809          * space to include it; and in any case it has to be included in the input
810          * for the picksplit function.  So don't increment nToInsert yet.
811          */
812         in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
813         heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
814         in.nTuples++;
815
816         /*
817          * Perform split using user-defined method.
818          */
819         memset(&out, 0, sizeof(out));
820
821         procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
822         FunctionCall2Coll(procinfo,
823                                           index->rd_indcollation[0],
824                                           PointerGetDatum(&in),
825                                           PointerGetDatum(&out));
826
827         /*
828          * Form new leaf tuples and count up the total space needed.
829          */
830         totalLeafSizes = 0;
831         for (i = 0; i < in.nTuples; i++)
832         {
833                 newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
834                                                                            out.leafTupleDatums[i]);
835                 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
836         }
837
838         /*
839          * Check to see if the picksplit function failed to separate the values,
840          * ie, it put them all into the same child node.  If so, select allTheSame
841          * mode and create a random split instead.  See comments for
842          * checkAllTheSame as to why we need to know if the new leaf tuples could
843          * fit on one page.
844          */
845         allTheSame = checkAllTheSame(&in, &out,
846                                                                  totalLeafSizes > SPGIST_PAGE_CAPACITY,
847                                                                  &includeNew);
848
849         /*
850          * If checkAllTheSame decided we must exclude the new tuple, don't
851          * consider it any further.
852          */
853         if (includeNew)
854                 maxToInclude = in.nTuples;
855         else
856         {
857                 maxToInclude = in.nTuples - 1;
858                 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
859         }
860
861         /*
862          * Allocate per-node work arrays.  Since checkAllTheSame could replace
863          * out.nNodes with a value larger than the number of tuples on the input
864          * page, we can't allocate these arrays before here.
865          */
866         nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
867         leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
868
869         /*
870          * Form nodes of inner tuple and inner tuple itself
871          */
872         for (i = 0; i < out.nNodes; i++)
873         {
874                 Datum           label = (Datum) 0;
875                 bool            isnull = (out.nodeLabels == NULL);
876
877                 if (!isnull)
878                         label = out.nodeLabels[i];
879                 nodes[i] = spgFormNodeTuple(state, label, isnull);
880         }
881         innerTuple = spgFormInnerTuple(state,
882                                                                    out.hasPrefix, out.prefixDatum,
883                                                                    out.nNodes, nodes);
884         innerTuple->allTheSame = allTheSame;
885
886         /*
887          * Update nodes[] array to point into the newly formed innerTuple, so
888          * that we can adjust their downlinks below.
889          */
890         SGITITERATE(innerTuple, i, node)
891         {
892                 nodes[i] = node;
893         }
894
895         /*
896          * Re-scan new leaf tuples and count up the space needed under each node.
897          */
898         for (i = 0; i < maxToInclude; i++)
899         {
900                 n = out.mapTuplesToNodes[i];
901                 if (n < 0 || n >= out.nNodes)
902                         elog(ERROR, "inconsistent result of SPGiST picksplit function");
903                 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
904         }
905
906         /*
907          * To perform the split, we must insert a new inner tuple, which can't
908          * go on a leaf page; and unless we are splitting the root page, we
909          * must then update the parent tuple's downlink to point to the inner
910          * tuple.  If there is room, we'll put the new inner tuple on the same
911          * page as the parent tuple, otherwise we need another non-leaf buffer.
912          * But if the parent page is the root, we can't add the new inner tuple
913          * there, because the root page must have only one inner tuple.
914          */
915         xlrec.initInner = false;
916         if (parent->buffer != InvalidBuffer &&
917                 parent->blkno != SPGIST_HEAD_BLKNO &&
918                 (SpGistPageGetFreeSpace(parent->page, 1) >=
919                  innerTuple->size + sizeof(ItemIdData)))
920         {
921                 /* New inner tuple will fit on parent page */
922                 newInnerBuffer = parent->buffer;
923         }
924         else if (parent->buffer != InvalidBuffer)
925         {
926                 /* Send tuple to page with next triple parity (see README) */
927                 newInnerBuffer = SpGistGetBuffer(index,
928                                                                                  GBUF_INNER_PARITY(parent->blkno + 1),
929                                                                                  innerTuple->size + sizeof(ItemIdData),
930                                                                                  &xlrec.initInner);
931         }
932         else
933         {
934                 /* Root page split ... inner tuple will go to root page */
935                 newInnerBuffer = InvalidBuffer;
936         }
937
938         /*----------
939          * Because a WAL record can't involve more than four buffers, we can
940          * only afford to deal with two leaf pages in each picksplit action,
941          * ie the current page and at most one other.
942          *
943          * The new leaf tuples converted from the existing ones should require
944          * the same or less space, and therefore should all fit onto one page
945          * (although that's not necessarily the current page, since we can't
946          * delete the old tuples but only replace them with placeholders).
947          * However, the incoming new tuple might not also fit, in which case
948          * we might need another picksplit cycle to reduce it some more.
949          *
950          * If there's not room to put everything back onto the current page,
951          * then we decide on a per-node basis which tuples go to the new page.
952          * (We do it like that because leaf tuple chains can't cross pages,
953          * so we must place all leaf tuples belonging to the same parent node
954          * on the same page.)
955          *
956          * If we are splitting the root page (turning it from a leaf page into an
957          * inner page), then no leaf tuples can go back to the current page; they
958          * must all go somewhere else.
959          *----------
960          */
961         if (current->blkno != SPGIST_HEAD_BLKNO)
962                 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
963         else
964                 currentFreeSpace = 0;   /* prevent assigning any tuples to current */
965
966         xlrec.initDest = false;
967
968         if (totalLeafSizes <= currentFreeSpace)
969         {
970                 /* All the leaf tuples will fit on current page */
971                 newLeafBuffer = InvalidBuffer;
972                 /* mark new leaf tuple as included in insertions, if allowed */
973                 if (includeNew)
974                 {
975                         nToInsert++;
976                         insertedNew = true;
977                 }
978                 for (i = 0; i < nToInsert; i++)
979                         leafPageSelect[i] = 0;          /* signifies current page */
980         }
981         else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
982         {
983                 /*
984                  * We're trying to split up a long value by repeated suffixing, but
985                  * it's not going to fit yet.  Don't bother allocating a second leaf
986                  * buffer that we won't be able to use.
987                  */
988                 newLeafBuffer = InvalidBuffer;
989                 Assert(includeNew);
990                 Assert(nToInsert == 0);
991         }
992         else
993         {
994                 /* We will need another leaf page */
995                 uint8      *nodePageSelect;
996                 int                     curspace;
997                 int                     newspace;
998
999                 newLeafBuffer = SpGistGetBuffer(index, GBUF_LEAF,
1000                                                                                 Min(totalLeafSizes,
1001                                                                                         SPGIST_PAGE_CAPACITY),
1002                                                                                 &xlrec.initDest);
1003                 /*
1004                  * Attempt to assign node groups to the two pages.  We might fail to
1005                  * do so, even if totalLeafSizes is less than the available space,
1006                  * because we can't split a group across pages.
1007                  */
1008                 nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1009
1010                 curspace = currentFreeSpace;
1011                 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1012                 for (i = 0; i < out.nNodes; i++)
1013                 {
1014                         if (leafSizes[i] <= curspace)
1015                         {
1016                                 nodePageSelect[i] = 0; /* signifies current page */
1017                                 curspace -= leafSizes[i];
1018                         }
1019                         else
1020                         {
1021                                 nodePageSelect[i] = 1; /* signifies new leaf page */
1022                                 newspace -= leafSizes[i];
1023                         }
1024                 }
1025                 if (curspace >= 0 && newspace >= 0)
1026                 {
1027                         /* Successful assignment, so we can include the new leaf tuple */
1028                         if (includeNew)
1029                         {
1030                                 nToInsert++;
1031                                 insertedNew = true;
1032                         }
1033                 }
1034                 else if (includeNew)
1035                 {
1036                         /* We must exclude the new leaf tuple from the split */
1037                         int             nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1038
1039                         leafSizes[nodeOfNewTuple] -=
1040                                 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1041
1042                         /* Repeat the node assignment process --- should succeed now */
1043                         curspace = currentFreeSpace;
1044                         newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1045                         for (i = 0; i < out.nNodes; i++)
1046                         {
1047                                 if (leafSizes[i] <= curspace)
1048                                 {
1049                                         nodePageSelect[i] = 0; /* signifies current page */
1050                                         curspace -= leafSizes[i];
1051                                 }
1052                                 else
1053                                 {
1054                                         nodePageSelect[i] = 1; /* signifies new leaf page */
1055                                         newspace -= leafSizes[i];
1056                                 }
1057                         }
1058                         if (curspace < 0 || newspace < 0)
1059                                 elog(ERROR, "failed to divide leaf tuple groups across pages");
1060                 }
1061                 else
1062                 {
1063                         /* oops, we already excluded new tuple ... should not get here */
1064                         elog(ERROR, "failed to divide leaf tuple groups across pages");
1065                 }
1066                 /* Expand the per-node assignments to be shown per leaf tuple */
1067                 for (i = 0; i < nToInsert; i++)
1068                 {
1069                         n = out.mapTuplesToNodes[i];
1070                         leafPageSelect[i] = nodePageSelect[n];
1071                 }
1072         }
1073
1074         /* Start preparing WAL record */
1075         xlrec.blknoSrc = current->blkno;
1076         xlrec.blknoDest = InvalidBlockNumber;
1077         xlrec.nDelete = 0;
1078         xlrec.initSrc = isNew;
1079
1080         leafdata = leafptr = (char *) palloc(totalLeafSizes);
1081
1082         ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
1083         ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, 1);
1084         nRdata = 2;
1085
1086         /* Here we begin making the changes to the target pages */
1087         START_CRIT_SECTION();
1088
1089         /*
1090          * Delete old leaf tuples from current buffer, except when we're splitting
1091          * the root; in that case there's no need because we'll re-init the page
1092          * below.  We do this first to make room for reinserting new leaf tuples.
1093          */
1094         if (current->blkno != SPGIST_HEAD_BLKNO)
1095         {
1096                 /*
1097                  * Init buffer instead of deleting individual tuples, but only if
1098                  * there aren't any other live tuples and only during build; otherwise
1099                  * we need to set a redirection tuple for concurrent scans.
1100                  */
1101                 if (state->isBuild &&
1102                         nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1103                         PageGetMaxOffsetNumber(current->page))
1104                 {
1105                         SpGistInitBuffer(current->buffer, SPGIST_LEAF);
1106                         xlrec.initSrc = true;
1107                 }
1108                 else if (isNew)
1109                 {
1110                         /* don't expose the freshly init'd buffer as a backup block */
1111                         Assert(nToDelete == 0);
1112                 }
1113                 else
1114                 {
1115                         xlrec.nDelete = nToDelete;
1116                         ACCEPT_RDATA_DATA(toDelete,
1117                                                           MAXALIGN(sizeof(OffsetNumber) * nToDelete),
1118                                                           nRdata);
1119                         nRdata++;
1120                         ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
1121                         nRdata++;
1122
1123                         if (!state->isBuild)
1124                         {
1125                                 /*
1126                                  * Need to create redirect tuple (it will point to new inner
1127                                  * tuple) but right now the new tuple's location is not known
1128                                  * yet.  So, set the redirection pointer to "impossible" value
1129                                  * and remember its position to update tuple later.
1130                                  */
1131                                 if (nToDelete > 0)
1132                                         redirectTuplePos = toDelete[0];
1133                                 spgPageIndexMultiDelete(state, current->page,
1134                                                                                 toDelete, nToDelete,
1135                                                                                 SPGIST_REDIRECT,
1136                                                                                 SPGIST_PLACEHOLDER,
1137                                                                                 SPGIST_METAPAGE_BLKNO,
1138                                                                                 FirstOffsetNumber);
1139                         }
1140                         else
1141                         {
1142                                 /*
1143                                  * During index build there is not concurrent searches, so we
1144                                  * don't need to create redirection tuple.
1145                                  */
1146                                 spgPageIndexMultiDelete(state, current->page,
1147                                                                                 toDelete, nToDelete,
1148                                                                                 SPGIST_PLACEHOLDER,
1149                                                                                 SPGIST_PLACEHOLDER,
1150                                                                                 InvalidBlockNumber,
1151                                                                                 InvalidOffsetNumber);
1152                         }
1153                 }
1154         }
1155
1156         /*
1157          * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1158          * nodes.
1159          */
1160         startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1161         for (i = 0; i < nToInsert; i++)
1162         {
1163                 SpGistLeafTuple it = newLeafs[i];
1164                 Buffer  leafBuffer;
1165                 BlockNumber leafBlock;
1166                 OffsetNumber newoffset;
1167
1168                 /* Which page is it going to? */
1169                 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1170                 leafBlock = BufferGetBlockNumber(leafBuffer);
1171
1172                 /* Link tuple into correct chain for its node */
1173                 n = out.mapTuplesToNodes[i];
1174
1175                 if (ItemPointerIsValid(&nodes[n]->t_tid))
1176                 {
1177                         Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1178                         it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1179                 }
1180                 else
1181                         it->nextOffset = InvalidOffsetNumber;
1182
1183                 /* Insert it on page */
1184                 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1185                                                                                  (Item) it, it->size,
1186                                                                                  &startOffsets[leafPageSelect[i]],
1187                                                                                  false);
1188                 toInsert[i] = newoffset;
1189
1190                 /* ... and complete the chain linking */
1191                 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1192
1193                 /* Also copy leaf tuple into WAL data */
1194                 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1195                 leafptr += newLeafs[i]->size;
1196         }
1197
1198         /*
1199          * We're done modifying the other leaf buffer (if any), so mark it dirty.
1200          * current->buffer will be marked below, after we're entirely done
1201          * modifying it.
1202          */
1203         if (newLeafBuffer != InvalidBuffer)
1204         {
1205                 MarkBufferDirty(newLeafBuffer);
1206                 /* also save block number for WAL */
1207                 xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer);
1208                 if (!xlrec.initDest)
1209                 {
1210                         ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata);
1211                         nRdata++;
1212                 }
1213         }
1214
1215         xlrec.nInsert = nToInsert;
1216         ACCEPT_RDATA_DATA(toInsert,
1217                                           MAXALIGN(sizeof(OffsetNumber) * nToInsert),
1218                                           nRdata);
1219         nRdata++;
1220         ACCEPT_RDATA_DATA(leafPageSelect,
1221                                           MAXALIGN(sizeof(uint8) * nToInsert),
1222                                           nRdata);
1223         nRdata++;
1224         ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata);
1225         nRdata++;
1226
1227         /* Remember current buffer, since we're about to change "current" */
1228         saveCurrent = *current;
1229
1230         /*
1231          * Store the new innerTuple
1232          */
1233         if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1234         {
1235                 /*
1236                  * new inner tuple goes to parent page
1237                  */
1238                 Assert(current->buffer != parent->buffer);
1239
1240                 /* Repoint "current" at the new inner tuple */
1241                 current->blkno = parent->blkno;
1242                 current->buffer = parent->buffer;
1243                 current->page = parent->page;
1244                 xlrec.blknoInner = current->blkno;
1245                 xlrec.offnumInner = current->offnum =
1246                         SpGistPageAddNewItem(state, current->page,
1247                                                                  (Item) innerTuple, innerTuple->size,
1248                                                                  NULL, false);
1249
1250                 /*
1251                  * Update parent node link and mark parent page dirty
1252                  */
1253                 xlrec.blknoParent = parent->blkno;
1254                 xlrec.offnumParent = parent->offnum;
1255                 xlrec.nodeI = parent->node;
1256                 saveNodeLink(index, parent, current->blkno, current->offnum);
1257
1258                 ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
1259                 nRdata++;
1260
1261                 /*
1262                  * Update redirection link (in old current buffer)
1263                  */
1264                 if (redirectTuplePos != InvalidOffsetNumber)
1265                         setRedirectionTuple(&saveCurrent, redirectTuplePos,
1266                                                                 current->blkno, current->offnum);
1267
1268                 /* Done modifying old current buffer, mark it dirty */
1269                 MarkBufferDirty(saveCurrent.buffer);
1270         }
1271         else if (parent->buffer != InvalidBuffer)
1272         {
1273                 /*
1274                  * new inner tuple will be stored on a new page
1275                  */
1276                 Assert(newInnerBuffer != InvalidBuffer);
1277
1278                 /* Repoint "current" at the new inner tuple */
1279                 current->buffer = newInnerBuffer;
1280                 current->blkno = BufferGetBlockNumber(current->buffer);
1281                 current->page = BufferGetPage(current->buffer);
1282                 xlrec.blknoInner = current->blkno;
1283                 xlrec.offnumInner = current->offnum =
1284                         SpGistPageAddNewItem(state, current->page,
1285                                                                  (Item) innerTuple, innerTuple->size,
1286                                                                  NULL, false);
1287
1288                 /* Done modifying new current buffer, mark it dirty */
1289                 MarkBufferDirty(current->buffer);
1290
1291                 /*
1292                  * Update parent node link and mark parent page dirty
1293                  */
1294                 xlrec.blknoParent = parent->blkno;
1295                 xlrec.offnumParent = parent->offnum;
1296                 xlrec.nodeI = parent->node;
1297                 saveNodeLink(index, parent, current->blkno, current->offnum);
1298
1299                 ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
1300                 nRdata++;
1301                 ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
1302                 nRdata++;
1303
1304                 /*
1305                  * Update redirection link (in old current buffer)
1306                  */
1307                 if (redirectTuplePos != InvalidOffsetNumber)
1308                         setRedirectionTuple(&saveCurrent, redirectTuplePos,
1309                                                                 current->blkno, current->offnum);
1310
1311                 /* Done modifying old current buffer, mark it dirty */
1312                 MarkBufferDirty(saveCurrent.buffer);
1313         }
1314         else
1315         {
1316                 /*
1317                  * Splitting root page, which was a leaf but now becomes inner page
1318                  * (and so "current" continues to point at it)
1319                  */
1320                 Assert(current->blkno == SPGIST_HEAD_BLKNO);
1321                 Assert(redirectTuplePos == InvalidOffsetNumber);
1322
1323                 SpGistInitBuffer(current->buffer, 0);
1324                 xlrec.initInner = true;
1325
1326                 xlrec.blknoInner = current->blkno;
1327                 xlrec.offnumInner = current->offnum =
1328                         PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1329                                                 InvalidOffsetNumber, false, false);
1330                 if (current->offnum != FirstOffsetNumber)
1331                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
1332                                  innerTuple->size);
1333
1334                 /* No parent link to update, nor redirection to do */
1335                 xlrec.blknoParent = InvalidBlockNumber;
1336                 xlrec.offnumParent = InvalidOffsetNumber;
1337                 xlrec.nodeI = 0;
1338
1339                 /* Done modifying new current buffer, mark it dirty */
1340                 MarkBufferDirty(current->buffer);
1341
1342                 /* saveCurrent doesn't represent a different buffer */
1343                 saveCurrent.buffer = InvalidBuffer;
1344         }
1345
1346         if (RelationNeedsWAL(index))
1347         {
1348                 XLogRecPtr      recptr;
1349
1350                 /* Issue the WAL record */
1351                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata);
1352
1353                 /* Update page LSNs on all affected pages */
1354                 if (newLeafBuffer != InvalidBuffer)
1355                 {
1356                         Page            page = BufferGetPage(newLeafBuffer);
1357
1358                         PageSetLSN(page, recptr);
1359                         PageSetTLI(page, ThisTimeLineID);
1360                 }
1361
1362                 if (saveCurrent.buffer != InvalidBuffer)
1363                 {
1364                         Page            page = BufferGetPage(saveCurrent.buffer);
1365
1366                         PageSetLSN(page, recptr);
1367                         PageSetTLI(page, ThisTimeLineID);
1368                 }
1369
1370                 PageSetLSN(current->page, recptr);
1371                 PageSetTLI(current->page, ThisTimeLineID);
1372
1373                 if (parent->buffer != InvalidBuffer)
1374                 {
1375                         PageSetLSN(parent->page, recptr);
1376                         PageSetTLI(parent->page, ThisTimeLineID);
1377                 }
1378         }
1379
1380         END_CRIT_SECTION();
1381
1382         /* Update local free-space cache and unlock buffers */
1383         if (newLeafBuffer != InvalidBuffer)
1384         {
1385                 SpGistSetLastUsedPage(index, newLeafBuffer);
1386                 UnlockReleaseBuffer(newLeafBuffer);
1387         }
1388         if (saveCurrent.buffer != InvalidBuffer)
1389         {
1390                 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1391                 UnlockReleaseBuffer(saveCurrent.buffer);
1392         }
1393
1394         return insertedNew;
1395 }
1396
1397 /*
1398  * spgMatchNode action: descend to N'th child node of current inner tuple
1399  */
1400 static void
1401 spgMatchNodeAction(Relation index, SpGistState *state,
1402                                    SpGistInnerTuple innerTuple,
1403                                    SPPageDesc *current, SPPageDesc *parent, int nodeN)
1404 {
1405         int                     i;
1406         SpGistNodeTuple node;
1407
1408         /* Release previous parent buffer if any */
1409         if (parent->buffer != InvalidBuffer &&
1410                 parent->buffer != current->buffer)
1411         {
1412                 SpGistSetLastUsedPage(index, parent->buffer);
1413                 UnlockReleaseBuffer(parent->buffer);
1414         }
1415
1416         /* Repoint parent to specified node of current inner tuple */
1417         parent->blkno = current->blkno;
1418         parent->buffer = current->buffer;
1419         parent->page = current->page;
1420         parent->offnum = current->offnum;
1421         parent->node = nodeN;
1422
1423         /* Locate that node */
1424         SGITITERATE(innerTuple, i, node)
1425         {
1426                 if (i == nodeN)
1427                         break;
1428         }
1429
1430         if (i != nodeN)
1431                 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1432                          nodeN);
1433
1434         /* Point current to the downlink location, if any */
1435         if (ItemPointerIsValid(&node->t_tid))
1436         {
1437                 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1438                 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1439         }
1440         else
1441         {
1442                 /* Downlink is empty, so we'll need to find a new page */
1443                 current->blkno = InvalidBlockNumber;
1444                 current->offnum = InvalidOffsetNumber;
1445         }
1446
1447         current->buffer = InvalidBuffer;
1448         current->page = NULL;
1449 }
1450
1451 /*
1452  * spgAddNode action: add a node to the inner tuple at current
1453  */
1454 static void
1455 spgAddNodeAction(Relation index, SpGistState *state,
1456                                  SpGistInnerTuple innerTuple,
1457                                  SPPageDesc *current, SPPageDesc *parent,
1458                                  int nodeN, Datum nodeLabel)
1459 {
1460         SpGistInnerTuple newInnerTuple;
1461         XLogRecData rdata[5];
1462         spgxlogAddNode xlrec;
1463
1464         /* Construct new inner tuple with additional node */
1465         newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1466
1467         /* Prepare WAL record */
1468         xlrec.node = index->rd_node;
1469         STORE_STATE(state, xlrec.stateSrc);
1470         xlrec.blkno = current->blkno;
1471         xlrec.offnum = current->offnum;
1472
1473         /* we don't fill these unless we need to change the parent downlink */
1474         xlrec.blknoParent = InvalidBlockNumber;
1475         xlrec.offnumParent = InvalidOffsetNumber;
1476         xlrec.nodeI = 0;
1477
1478         /* we don't fill these unless tuple has to be moved */
1479         xlrec.blknoNew = InvalidBlockNumber;
1480         xlrec.offnumNew = InvalidOffsetNumber;
1481         xlrec.newPage = false;
1482
1483         ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
1484         /* we assume sizeof(xlrec) is at least int-aligned */
1485         ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1);
1486         ACCEPT_RDATA_BUFFER(current->buffer, 2);
1487
1488         if (PageGetExactFreeSpace(current->page) >=
1489                 newInnerTuple->size - innerTuple->size)
1490         {
1491                 /*
1492                  * We can replace the inner tuple by new version in-place
1493                  */
1494                 START_CRIT_SECTION();
1495
1496                 PageIndexTupleDelete(current->page, current->offnum);
1497                 if (PageAddItem(current->page,
1498                                                 (Item) newInnerTuple, newInnerTuple->size,
1499                                                 current->offnum, false, false) != current->offnum)
1500                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
1501                                  newInnerTuple->size);
1502
1503                 MarkBufferDirty(current->buffer);
1504
1505                 if (RelationNeedsWAL(index))
1506                 {
1507                         XLogRecPtr      recptr;
1508
1509                         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
1510
1511                         PageSetLSN(current->page, recptr);
1512                         PageSetTLI(current->page, ThisTimeLineID);
1513                 }
1514
1515                 END_CRIT_SECTION();
1516         }
1517         else
1518         {
1519                 /*
1520                  * move inner tuple to another page, and update parent
1521                  */
1522                 SpGistDeadTuple dt;
1523                 SPPageDesc      saveCurrent;
1524
1525                 /*
1526                  * It should not be possible to get here for the root page, since we
1527                  * allow only one inner tuple on the root page, and spgFormInnerTuple
1528                  * always checks that inner tuples don't exceed the size of a page.
1529                  */
1530                 if (current->blkno == SPGIST_HEAD_BLKNO)
1531                         elog(ERROR, "cannot enlarge root tuple any more");
1532                 Assert(parent->buffer != InvalidBuffer);
1533
1534                 saveCurrent = *current;
1535
1536                 xlrec.blknoParent = parent->blkno;
1537                 xlrec.offnumParent = parent->offnum;
1538                 xlrec.nodeI = parent->node;
1539
1540                 /*
1541                  * obtain new buffer with the same parity as current, since it will
1542                  * be a child of same parent tuple
1543                  */
1544                 current->buffer = SpGistGetBuffer(index,
1545                                                                                   GBUF_INNER_PARITY(current->blkno),
1546                                                                                   newInnerTuple->size + sizeof(ItemIdData),
1547                                                                                   &xlrec.newPage);
1548                 current->blkno = BufferGetBlockNumber(current->buffer);
1549                 current->page = BufferGetPage(current->buffer);
1550
1551                 xlrec.blknoNew = current->blkno;
1552
1553                 /*
1554                  * Let's just make real sure new current isn't same as old.  Right
1555                  * now that's impossible, but if SpGistGetBuffer ever got smart enough
1556                  * to delete placeholder tuples before checking space, maybe it
1557                  * wouldn't be impossible.  The case would appear to work except that
1558                  * WAL replay would be subtly wrong, so I think a mere assert isn't
1559                  * enough here.
1560                  */
1561                  if (xlrec.blknoNew == xlrec.blkno)
1562                          elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1563
1564                 /*
1565                  * New current and parent buffer will both be modified; but note that
1566                  * parent buffer could be same as either new or old current.
1567                  */
1568                 ACCEPT_RDATA_BUFFER(current->buffer, 3);
1569                 if (parent->buffer != current->buffer &&
1570                         parent->buffer != saveCurrent.buffer)
1571                         ACCEPT_RDATA_BUFFER(parent->buffer, 4);
1572
1573                 START_CRIT_SECTION();
1574
1575                 /* insert new ... */
1576                 xlrec.offnumNew = current->offnum =
1577                         SpGistPageAddNewItem(state, current->page,
1578                                                                  (Item) newInnerTuple, newInnerTuple->size,
1579                                                                  NULL, false);
1580
1581                 MarkBufferDirty(current->buffer);
1582
1583                 /* update parent's downlink and mark parent page dirty */
1584                 saveNodeLink(index, parent, current->blkno, current->offnum);
1585
1586                 /*
1587                  * Replace old tuple with a placeholder or redirection tuple.  Unless
1588                  * doing an index build, we have to insert a redirection tuple for
1589                  * possible concurrent scans.  We can't just delete it in any case,
1590                  * because that could change the offsets of other tuples on the page,
1591                  * breaking downlinks from their parents.
1592                  */
1593                 if (state->isBuild)
1594                         dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1595                                                                   InvalidBlockNumber, InvalidOffsetNumber);
1596                 else
1597                         dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1598                                                                   current->blkno, current->offnum);
1599
1600                 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1601                 if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1602                                                 saveCurrent.offnum,
1603                                                 false, false) != saveCurrent.offnum)
1604                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
1605                                  dt->size);
1606
1607                 if (state->isBuild)
1608                         SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1609                 else
1610                         SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1611
1612                 MarkBufferDirty(saveCurrent.buffer);
1613
1614                 if (RelationNeedsWAL(index))
1615                 {
1616                         XLogRecPtr      recptr;
1617
1618                         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
1619
1620                         /* we don't bother to check if any of these are redundant */
1621                         PageSetLSN(current->page, recptr);
1622                         PageSetTLI(current->page, ThisTimeLineID);
1623                         PageSetLSN(parent->page, recptr);
1624                         PageSetTLI(parent->page, ThisTimeLineID);
1625                         PageSetLSN(saveCurrent.page, recptr);
1626                         PageSetTLI(saveCurrent.page, ThisTimeLineID);
1627                 }
1628
1629                 END_CRIT_SECTION();
1630
1631                 /* Release saveCurrent if it's not same as current or parent */
1632                 if (saveCurrent.buffer != current->buffer &&
1633                         saveCurrent.buffer != parent->buffer)
1634                 {
1635                         SpGistSetLastUsedPage(index, saveCurrent.buffer);
1636                         UnlockReleaseBuffer(saveCurrent.buffer);
1637                 }
1638         }
1639 }
1640
1641 /*
1642  * spgSplitNode action: split inner tuple at current into prefix and postfix
1643  */
1644 static void
1645 spgSplitNodeAction(Relation index, SpGistState *state,
1646                                    SpGistInnerTuple innerTuple,
1647                                    SPPageDesc *current, spgChooseOut *out)
1648 {
1649         SpGistInnerTuple prefixTuple,
1650                                 postfixTuple;
1651         SpGistNodeTuple node,
1652                            *nodes;
1653         BlockNumber postfixBlkno;
1654         OffsetNumber postfixOffset;
1655         int                     i;
1656         XLogRecData rdata[5];
1657         spgxlogSplitTuple xlrec;
1658         Buffer          newBuffer = InvalidBuffer;
1659
1660         /*
1661          * Construct new prefix tuple, containing a single node with the
1662          * specified label.  (We'll update the node's downlink to point to the
1663          * new postfix tuple, below.)
1664          */
1665         node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false);
1666
1667         prefixTuple = spgFormInnerTuple(state,
1668                                                                         out->result.splitTuple.prefixHasPrefix,
1669                                                                         out->result.splitTuple.prefixPrefixDatum,
1670                                                                         1, &node);
1671
1672         /* it must fit in the space that innerTuple now occupies */
1673         if (prefixTuple->size > innerTuple->size)
1674                 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1675
1676         /*
1677          * Construct new postfix tuple, containing all nodes of innerTuple with
1678          * same node datums, but with the prefix specified by the picksplit
1679          * function.
1680          */
1681         nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1682         SGITITERATE(innerTuple, i, node)
1683         {
1684                 nodes[i] = node;
1685         }
1686
1687         postfixTuple = spgFormInnerTuple(state,
1688                                                                          out->result.splitTuple.postfixHasPrefix,
1689                                                                    out->result.splitTuple.postfixPrefixDatum,
1690                                                                          innerTuple->nNodes, nodes);
1691
1692         /* Postfix tuple is allTheSame if original tuple was */
1693         postfixTuple->allTheSame = innerTuple->allTheSame;
1694
1695         /* prep data for WAL record */
1696         xlrec.node = index->rd_node;
1697         xlrec.newPage = false;
1698
1699         ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
1700         /* we assume sizeof(xlrec) is at least int-aligned */
1701         ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1);
1702         ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2);
1703         ACCEPT_RDATA_BUFFER(current->buffer, 3);
1704
1705         /*
1706          * If we can't fit both tuples on the current page, get a new page for the
1707          * postfix tuple.  In particular, can't split to the root page.
1708          *
1709          * For the space calculation, note that prefixTuple replaces innerTuple
1710          * but postfixTuple will be a new entry.
1711          */
1712         if (current->blkno == SPGIST_HEAD_BLKNO ||
1713                 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1714                 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1715         {
1716                 /*
1717                  * Choose page with next triple parity, because postfix tuple is a
1718                  * child of prefix one
1719                  */
1720                 newBuffer = SpGistGetBuffer(index,
1721                                                                         GBUF_INNER_PARITY(current->blkno + 1),
1722                                                                         postfixTuple->size + sizeof(ItemIdData),
1723                                                                         &xlrec.newPage);
1724                 ACCEPT_RDATA_BUFFER(newBuffer, 4);
1725         }
1726
1727         START_CRIT_SECTION();
1728
1729         /*
1730          * Replace old tuple by prefix tuple
1731          */
1732         PageIndexTupleDelete(current->page, current->offnum);
1733         xlrec.offnumPrefix = PageAddItem(current->page,
1734                                                                          (Item) prefixTuple, prefixTuple->size,
1735                                                                          current->offnum, false, false);
1736         if (xlrec.offnumPrefix != current->offnum)
1737                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1738                          prefixTuple->size);
1739         xlrec.blknoPrefix = current->blkno;
1740
1741         /*
1742          * put postfix tuple into appropriate page
1743          */
1744         if (newBuffer == InvalidBuffer)
1745         {
1746                 xlrec.blknoPostfix = postfixBlkno = current->blkno;
1747                 xlrec.offnumPostfix = postfixOffset =
1748                         SpGistPageAddNewItem(state, current->page,
1749                                                                  (Item) postfixTuple, postfixTuple->size,
1750                                                                  NULL, false);
1751         }
1752         else
1753         {
1754                 xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer);
1755                 xlrec.offnumPostfix = postfixOffset =
1756                         SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1757                                                                  (Item) postfixTuple, postfixTuple->size,
1758                                                                  NULL, false);
1759                 MarkBufferDirty(newBuffer);
1760         }
1761
1762         /*
1763          * And set downlink pointer in the prefix tuple to point to postfix tuple.
1764          * (We can't avoid this step by doing the above two steps in opposite
1765          * order, because there might not be enough space on the page to insert
1766          * the postfix tuple first.)  We have to update the local copy of the
1767          * prefixTuple too, because that's what will be written to WAL.
1768          */
1769         spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1770         prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1771                                                           PageGetItemId(current->page, current->offnum));
1772         spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1773
1774         MarkBufferDirty(current->buffer);
1775
1776         if (RelationNeedsWAL(index))
1777         {
1778                 XLogRecPtr      recptr;
1779
1780                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
1781
1782                 PageSetLSN(current->page, recptr);
1783                 PageSetTLI(current->page, ThisTimeLineID);
1784
1785                 if (newBuffer != InvalidBuffer)
1786                 {
1787                         PageSetLSN(BufferGetPage(newBuffer), recptr);
1788                         PageSetTLI(BufferGetPage(newBuffer), ThisTimeLineID);
1789                 }
1790         }
1791
1792         END_CRIT_SECTION();
1793
1794         /* Update local free-space cache and release buffer */
1795         if (newBuffer != InvalidBuffer)
1796         {
1797                 SpGistSetLastUsedPage(index, newBuffer);
1798                 UnlockReleaseBuffer(newBuffer);
1799         }
1800 }
1801
1802 /*
1803  * Insert one item into the index
1804  */
1805 void
1806 spgdoinsert(Relation index, SpGistState *state,
1807                         ItemPointer heapPtr, Datum datum)
1808 {
1809         int                     level = 0;
1810         Datum           leafDatum;
1811         int                     leafSize;
1812         SPPageDesc      current,
1813                                 parent;
1814
1815         /*
1816          * Since we don't use index_form_tuple in this AM, we have to make sure
1817          * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
1818          * that.
1819          */
1820         if (state->attType.attlen == -1)
1821                 datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1822
1823         leafDatum = datum;
1824
1825         /*
1826          * Compute space needed for a leaf tuple containing the given datum.
1827          *
1828          * If it isn't gonna fit, and the opclass can't reduce the datum size by
1829          * suffixing, bail out now rather than getting into an endless loop.
1830          */
1831         leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1832                 SpGistGetTypeSize(&state->attType, leafDatum);
1833
1834         if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
1835                 ereport(ERROR,
1836                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1837                         errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
1838                                    (unsigned long) (leafSize - sizeof(ItemIdData)),
1839                                    (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
1840                                    RelationGetRelationName(index)),
1841                   errhint("Values larger than a buffer page cannot be indexed.")));
1842
1843         /* Initialize "current" to the root page */
1844         current.blkno = SPGIST_HEAD_BLKNO;
1845         current.buffer = InvalidBuffer;
1846         current.page = NULL;
1847         current.offnum = FirstOffsetNumber;
1848         current.node = -1;
1849
1850         /* "parent" is invalid for the moment */
1851         parent.blkno = InvalidBlockNumber;
1852         parent.buffer = InvalidBuffer;
1853         parent.page = NULL;
1854         parent.offnum = InvalidOffsetNumber;
1855         parent.node = -1;
1856
1857         for (;;)
1858         {
1859                 bool            isNew = false;
1860
1861                 /*
1862                  * Bail out if query cancel is pending.  We must have this somewhere
1863                  * in the loop since a broken opclass could produce an infinite
1864                  * picksplit loop.
1865                  */
1866                 CHECK_FOR_INTERRUPTS();
1867
1868                 if (current.blkno == InvalidBlockNumber)
1869                 {
1870                         /*
1871                          * Create a leaf page.  If leafSize is too large to fit on a page,
1872                          * we won't actually use the page yet, but it simplifies the API
1873                          * for doPickSplit to always have a leaf page at hand; so just
1874                          * quietly limit our request to a page size.
1875                          */
1876                         current.buffer = SpGistGetBuffer(index, GBUF_LEAF,
1877                                                                                          Min(leafSize,
1878                                                                                                  SPGIST_PAGE_CAPACITY),
1879                                                                                          &isNew);
1880                         current.blkno = BufferGetBlockNumber(current.buffer);
1881                 }
1882                 else if (parent.buffer == InvalidBuffer ||
1883                                  current.blkno != parent.blkno)
1884                 {
1885                         current.buffer = ReadBuffer(index, current.blkno);
1886                         LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
1887                 }
1888                 else
1889                 {
1890                         /* inner tuple can be stored on the same page as parent one */
1891                         current.buffer = parent.buffer;
1892                 }
1893                 current.page = BufferGetPage(current.buffer);
1894
1895                 if (SpGistPageIsLeaf(current.page))
1896                 {
1897                         SpGistLeafTuple leafTuple;
1898                         int                     nToSplit,
1899                                                 sizeToSplit;
1900
1901                         leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum);
1902                         if (leafTuple->size + sizeof(ItemIdData) <=
1903                                 SpGistPageGetFreeSpace(current.page, 1))
1904                         {
1905                                 /* it fits on page, so insert it and we're done */
1906                                 addLeafTuple(index, state, leafTuple,
1907                                                          &current, &parent, isNew);
1908                                 break;
1909                         }
1910                         else if ((sizeToSplit =
1911                                           checkSplitConditions(index, state, &current,
1912                                                                                    &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
1913                                          nToSplit < 64 &&
1914                                          leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
1915                         {
1916                                 /*
1917                                  * the amount of data is pretty small, so just move the whole
1918                                  * chain to another leaf page rather than splitting it.
1919                                  */
1920                                 Assert(!isNew);
1921                                 moveLeafs(index, state, &current, &parent, leafTuple);
1922                                 break;                  /* we're done */
1923                         }
1924                         else
1925                         {
1926                                 /* picksplit */
1927                                 if (doPickSplit(index, state, &current, &parent,
1928                                                                 leafTuple, level, isNew))
1929                                         break;          /* doPickSplit installed new tuples */
1930
1931                                 /* leaf tuple will not be inserted yet */
1932                                 pfree(leafTuple);
1933
1934                                 /*
1935                                  * current now describes new inner tuple, go insert into it
1936                                  */
1937                                 Assert(!SpGistPageIsLeaf(current.page));
1938                                 goto process_inner_tuple;
1939                         }
1940                 }
1941                 else    /* non-leaf page */
1942                 {
1943                         /*
1944                          * Apply the opclass choose function to figure out how to insert
1945                          * the given datum into the current inner tuple.
1946                          */
1947                         SpGistInnerTuple innerTuple;
1948                         spgChooseIn in;
1949                         spgChooseOut out;
1950                         FmgrInfo   *procinfo;
1951
1952                         /*
1953                          * spgAddNode and spgSplitTuple cases will loop back to here to
1954                          * complete the insertion operation.  Just in case the choose
1955                          * function is broken and produces add or split requests
1956                          * repeatedly, check for query cancel.
1957                          */
1958         process_inner_tuple:
1959                         CHECK_FOR_INTERRUPTS();
1960
1961                         innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
1962                                                                 PageGetItemId(current.page, current.offnum));
1963
1964                         in.datum = datum;
1965                         in.leafDatum = leafDatum;
1966                         in.level = level;
1967                         in.allTheSame = innerTuple->allTheSame;
1968                         in.hasPrefix = (innerTuple->prefixSize > 0);
1969                         in.prefixDatum = SGITDATUM(innerTuple, state);
1970                         in.nNodes = innerTuple->nNodes;
1971                         in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
1972
1973                         memset(&out, 0, sizeof(out));
1974
1975                         procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1976                         FunctionCall2Coll(procinfo,
1977                                                           index->rd_indcollation[0],
1978                                                           PointerGetDatum(&in),
1979                                                           PointerGetDatum(&out));
1980
1981                         if (innerTuple->allTheSame)
1982                         {
1983                                 /*
1984                                  * It's not allowed to do an AddNode at an allTheSame tuple.
1985                                  * Opclass must say "match", in which case we choose a random
1986                                  * one of the nodes to descend into, or "split".
1987                                  */
1988                                 if (out.resultType == spgAddNode)
1989                                         elog(ERROR, "cannot add a node to an allTheSame inner tuple");
1990                                 else if (out.resultType == spgMatchNode)
1991                                         out.result.matchNode.nodeN = random() % innerTuple->nNodes;
1992                         }
1993
1994                         switch (out.resultType)
1995                         {
1996                                 case spgMatchNode:
1997                                         /* Descend to N'th child node */
1998                                         spgMatchNodeAction(index, state, innerTuple,
1999                                                                            &current, &parent,
2000                                                                            out.result.matchNode.nodeN);
2001                                         /* Adjust level as per opclass request */
2002                                         level += out.result.matchNode.levelAdd;
2003                                         /* Replace leafDatum and recompute leafSize */
2004                                         leafDatum = out.result.matchNode.restDatum;
2005                                         leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2006                                                 SpGistGetTypeSize(&state->attType, leafDatum);
2007
2008                                         /*
2009                                          * Loop around and attempt to insert the new leafDatum
2010                                          * at "current" (which might reference an existing child
2011                                          * tuple, or might be invalid to force us to find a new
2012                                          * page for the tuple).
2013                                          *
2014                                          * Note: if the opclass sets longValuesOK, we rely on the
2015                                          * choose function to eventually shorten the leafDatum
2016                                          * enough to fit on a page.  We could add a test here to
2017                                          * complain if the datum doesn't get visibly shorter each
2018                                          * time, but that could get in the way of opclasses that
2019                                          * "simplify" datums in a way that doesn't necessarily
2020                                          * lead to physical shortening on every cycle.
2021                                          */
2022                                         break;
2023                                 case spgAddNode:
2024                                         /* AddNode is not sensible if nodes don't have labels */
2025                                         if (in.nodeLabels == NULL)
2026                                                 elog(ERROR, "cannot add a node to an inner tuple without node labels");
2027                                         /* Add node to inner tuple, per request */
2028                                         spgAddNodeAction(index, state, innerTuple,
2029                                                                          &current, &parent,
2030                                                                          out.result.addNode.nodeN,
2031                                                                          out.result.addNode.nodeLabel);
2032
2033                                         /*
2034                                          * Retry insertion into the enlarged node.  We assume
2035                                          * that we'll get a MatchNode result this time.
2036                                          */
2037                                         goto process_inner_tuple;
2038                                         break;
2039                                 case spgSplitTuple:
2040                                         /* Split inner tuple, per request */
2041                                         spgSplitNodeAction(index, state, innerTuple,
2042                                                                            &current, &out);
2043
2044                                         /* Retry insertion into the split node */
2045                                         goto process_inner_tuple;
2046                                         break;
2047                                 default:
2048                                         elog(ERROR, "unrecognized SPGiST choose result: %d",
2049                                                  (int) out.resultType);
2050                                         break;
2051                         }
2052                 }
2053         }                                                       /* end loop */
2054
2055         /*
2056          * Release any buffers we're still holding.  Beware of possibility that
2057          * current and parent reference same buffer.
2058          */
2059         if (current.buffer != InvalidBuffer)
2060         {
2061                 SpGistSetLastUsedPage(index, current.buffer);
2062                 UnlockReleaseBuffer(current.buffer);
2063         }
2064         if (parent.buffer != InvalidBuffer &&
2065                 parent.buffer != current.buffer)
2066         {
2067                 SpGistSetLastUsedPage(index, parent.buffer);
2068                 UnlockReleaseBuffer(parent.buffer);
2069         }
2070 }