]> granicus.if.org Git - postgresql/blob - src/backend/access/spgist/spgdoinsert.c
Teach SPGiST to store nulls and do whole-index scans.
[postgresql] / src / backend / access / spgist / spgdoinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * spgdoinsert.c
4  *        implementation of insert algorithm
5  *
6  *
7  * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/spgist/spgdoinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "miscadmin.h"
21 #include "storage/bufmgr.h"
22
23
24 /*
25  * SPPageDesc tracks all info about a page we are inserting into.  In some
26  * situations it actually identifies a tuple, or even a specific node within
27  * an inner tuple.  But any of the fields can be invalid.  If the buffer
28  * field is valid, it implies we hold pin and exclusive lock on that buffer.
29  * page pointer should be valid exactly when buffer is.
30  */
31 typedef struct SPPageDesc
32 {
33         BlockNumber blkno;                      /* block number, or InvalidBlockNumber */
34         Buffer          buffer;                 /* page's buffer number, or InvalidBuffer */
35         Page            page;                   /* pointer to page buffer, or NULL */
36         OffsetNumber offnum;            /* offset of tuple, or InvalidOffsetNumber */
37         int                     node;                   /* node number within inner tuple, or -1 */
38 } SPPageDesc;
39
40
41 /*
42  * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
43  * is used to update the parent inner tuple's downlink after a move or
44  * split operation.
45  */
46 void
47 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
48                                   BlockNumber blkno, OffsetNumber offset)
49 {
50         int                     i;
51         SpGistNodeTuple node;
52
53         SGITITERATE(tup, i, node)
54         {
55                 if (i == nodeN)
56                 {
57                         ItemPointerSet(&node->t_tid, blkno, offset);
58                         return;
59                 }
60         }
61
62         elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
63                  nodeN);
64 }
65
66 /*
67  * Form a new inner tuple containing one more node than the given one, with
68  * the specified label datum, inserted at offset "offset" in the node array.
69  * The new tuple's prefix is the same as the old one's.
70  *
71  * Note that the new node initially has an invalid downlink.  We'll find a
72  * page to point it to later.
73  */
74 static SpGistInnerTuple
75 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
76 {
77         SpGistNodeTuple node,
78                            *nodes;
79         int                     i;
80
81         /* if offset is negative, insert at end */
82         if (offset < 0)
83                 offset = tuple->nNodes;
84         else if (offset > tuple->nNodes)
85                 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
86
87         nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
88         SGITITERATE(tuple, i, node)
89         {
90                 if (i < offset)
91                         nodes[i] = node;
92                 else
93                         nodes[i + 1] = node;
94         }
95
96         nodes[offset] = spgFormNodeTuple(state, label, false);
97
98         return spgFormInnerTuple(state,
99                                                          (tuple->prefixSize > 0),
100                                                          SGITDATUM(tuple, state),
101                                                          tuple->nNodes + 1,
102                                                          nodes);
103 }
104
105 /* qsort comparator for sorting OffsetNumbers */
106 static int
107 cmpOffsetNumbers(const void *a, const void *b)
108 {
109         if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
110                 return 0;
111         return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
112 }
113
114 /*
115  * Delete multiple tuples from an index page, preserving tuple offset numbers.
116  *
117  * The first tuple in the given list is replaced with a dead tuple of type
118  * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
119  * with dead tuples of type "reststate".  If either firststate or reststate
120  * is REDIRECT, blkno/offnum specify where to link to.
121  *
122  * NB: this is used during WAL replay, so beware of trying to make it too
123  * smart.  In particular, it shouldn't use "state" except for calling
124  * spgFormDeadTuple().
125  */
126 void
127 spgPageIndexMultiDelete(SpGistState *state, Page page,
128                                                 OffsetNumber *itemnos, int nitems,
129                                                 int firststate, int reststate,
130                                                 BlockNumber blkno, OffsetNumber offnum)
131 {
132         OffsetNumber    firstItem;
133         OffsetNumber   *sortednos;
134         SpGistDeadTuple tuple = NULL;
135         int                     i;
136
137         if (nitems == 0)
138                 return;                                 /* nothing to do */
139
140         /*
141          * For efficiency we want to use PageIndexMultiDelete, which requires the
142          * targets to be listed in sorted order, so we have to sort the itemnos
143          * array.  (This also greatly simplifies the math for reinserting the
144          * replacement tuples.)  However, we must not scribble on the caller's
145          * array, so we have to make a copy.
146          */
147         sortednos = (OffsetNumber *) palloc(sizeof(OffsetNumber) * nitems);
148         memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
149         if (nitems > 1)
150                 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
151
152         PageIndexMultiDelete(page, sortednos, nitems);
153
154         firstItem = itemnos[0];
155
156         for (i = 0; i < nitems; i++)
157         {
158                 OffsetNumber    itemno = sortednos[i];
159                 int                             tupstate;
160
161                 tupstate = (itemno == firstItem) ? firststate : reststate;
162                 if (tuple == NULL || tuple->tupstate != tupstate)
163                         tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
164
165                 if (PageAddItem(page, (Item) tuple, tuple->size,
166                                                 itemno, false, false) != itemno)
167                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
168                                  tuple->size);
169
170                 if (tupstate == SPGIST_REDIRECT)
171                         SpGistPageGetOpaque(page)->nRedirection++;
172                 else if (tupstate == SPGIST_PLACEHOLDER)
173                         SpGistPageGetOpaque(page)->nPlaceholder++;
174         }
175
176         pfree(sortednos);
177 }
178
179 /*
180  * Update the parent inner tuple's downlink, and mark the parent buffer
181  * dirty (this must be the last change to the parent page in the current
182  * WAL action).
183  */
184 static void
185 saveNodeLink(Relation index, SPPageDesc *parent,
186                          BlockNumber blkno, OffsetNumber offnum)
187 {
188         SpGistInnerTuple innerTuple;
189
190         innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
191                                                                 PageGetItemId(parent->page, parent->offnum));
192
193         spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
194
195         MarkBufferDirty(parent->buffer);
196 }
197
198 /*
199  * Add a leaf tuple to a leaf page where there is known to be room for it
200  */
201 static void
202 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
203                          SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
204 {
205         XLogRecData rdata[4];
206         spgxlogAddLeaf xlrec;
207
208         xlrec.node = index->rd_node;
209         xlrec.blknoLeaf = current->blkno;
210         xlrec.newPage = isNew;
211         xlrec.storesNulls = isNulls;
212
213         /* these will be filled below as needed */
214         xlrec.offnumLeaf = InvalidOffsetNumber;
215         xlrec.offnumHeadLeaf = InvalidOffsetNumber;
216         xlrec.blknoParent = InvalidBlockNumber;
217         xlrec.offnumParent = InvalidOffsetNumber;
218         xlrec.nodeI = 0;
219
220         ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
221         /* we assume sizeof(xlrec) is at least int-aligned */
222         ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1);
223         ACCEPT_RDATA_BUFFER(current->buffer, 2);
224
225         START_CRIT_SECTION();
226
227         if (current->offnum == InvalidOffsetNumber ||
228                 SpGistBlockIsRoot(current->blkno))
229         {
230                 /* Tuple is not part of a chain */
231                 leafTuple->nextOffset = InvalidOffsetNumber;
232                 current->offnum = SpGistPageAddNewItem(state, current->page,
233                                                                                            (Item) leafTuple, leafTuple->size,
234                                                                                            NULL, false);
235
236                 xlrec.offnumLeaf = current->offnum;
237
238                 /* Must update parent's downlink if any */
239                 if (parent->buffer != InvalidBuffer)
240                 {
241                         xlrec.blknoParent = parent->blkno;
242                         xlrec.offnumParent = parent->offnum;
243                         xlrec.nodeI = parent->node;
244
245                         saveNodeLink(index, parent, current->blkno, current->offnum);
246
247                         ACCEPT_RDATA_BUFFER(parent->buffer, 3);
248                 }
249         }
250         else
251         {
252                 /*
253                  * Tuple must be inserted into existing chain.  We mustn't change
254                  * the chain's head address, but we don't need to chase the entire
255                  * chain to put the tuple at the end; we can insert it second.
256                  *
257                  * Also, it's possible that the "chain" consists only of a DEAD tuple,
258                  * in which case we should replace the DEAD tuple in-place.
259                  */
260                 SpGistLeafTuple head;
261                 OffsetNumber offnum;
262
263                 head = (SpGistLeafTuple) PageGetItem(current->page,
264                                                                                          PageGetItemId(current->page, current->offnum));
265                 if (head->tupstate == SPGIST_LIVE)
266                 {
267                         leafTuple->nextOffset = head->nextOffset;
268                         offnum = SpGistPageAddNewItem(state, current->page,
269                                                                                   (Item) leafTuple, leafTuple->size,
270                                                                                   NULL, false);
271
272                         /*
273                          * re-get head of list because it could have been moved on page,
274                          * and set new second element
275                          */
276                         head = (SpGistLeafTuple) PageGetItem(current->page,
277                                                                                          PageGetItemId(current->page, current->offnum));
278                         head->nextOffset = offnum;
279
280                         xlrec.offnumLeaf = offnum;
281                         xlrec.offnumHeadLeaf = current->offnum;
282                 }
283                 else if (head->tupstate == SPGIST_DEAD)
284                 {
285                         leafTuple->nextOffset = InvalidOffsetNumber;
286                         PageIndexTupleDelete(current->page, current->offnum);
287                         if (PageAddItem(current->page,
288                                                         (Item) leafTuple, leafTuple->size,
289                                                         current->offnum, false, false) != current->offnum)
290                                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
291                                          leafTuple->size);
292
293                         /* WAL replay distinguishes this case by equal offnums */
294                         xlrec.offnumLeaf = current->offnum;
295                         xlrec.offnumHeadLeaf = current->offnum;
296                 }
297                 else
298                         elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
299         }
300
301         MarkBufferDirty(current->buffer);
302
303         if (RelationNeedsWAL(index))
304         {
305                 XLogRecPtr      recptr;
306
307                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
308
309                 PageSetLSN(current->page, recptr);
310                 PageSetTLI(current->page, ThisTimeLineID);
311
312                 /* update parent only if we actually changed it */
313                 if (xlrec.blknoParent != InvalidBlockNumber)
314                 {
315                         PageSetLSN(parent->page, recptr);
316                         PageSetTLI(parent->page, ThisTimeLineID);
317                 }
318         }
319
320         END_CRIT_SECTION();
321 }
322
323 /*
324  * Count the number and total size of leaf tuples in the chain starting at
325  * current->offnum.  Return number into *nToSplit and total size as function
326  * result.
327  *
328  * Klugy special case when considering the root page (i.e., root is a leaf
329  * page, but we're about to split for the first time): return fake large
330  * values to force spgdoinsert() to take the doPickSplit rather than
331  * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
332  */
333 static int
334 checkSplitConditions(Relation index, SpGistState *state,
335                                          SPPageDesc *current, int *nToSplit)
336 {
337         int                     i,
338                                 n = 0,
339                                 totalSize = 0;
340
341         if (SpGistBlockIsRoot(current->blkno))
342         {
343                 /* return impossible values to force split */
344                 *nToSplit = BLCKSZ;
345                 return BLCKSZ;
346         }
347
348         i = current->offnum;
349         while (i != InvalidOffsetNumber)
350         {
351                 SpGistLeafTuple it;
352
353                 Assert(i >= FirstOffsetNumber &&
354                            i <= PageGetMaxOffsetNumber(current->page));
355                 it = (SpGistLeafTuple) PageGetItem(current->page,
356                                                                                    PageGetItemId(current->page, i));
357                 if (it->tupstate == SPGIST_LIVE)
358                 {
359                         n++;
360                         totalSize += it->size + sizeof(ItemIdData);
361                 }
362                 else if (it->tupstate == SPGIST_DEAD)
363                 {
364                         /* We could see a DEAD tuple as first/only chain item */
365                         Assert(i == current->offnum);
366                         Assert(it->nextOffset == InvalidOffsetNumber);
367                         /* Don't count it in result, because it won't go to other page */
368                 }
369                 else
370                         elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
371
372                 i = it->nextOffset;
373         }
374
375         *nToSplit = n;
376
377         return totalSize;
378 }
379
380 /*
381  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
382  * but the chain has to be moved because there's not enough room to add
383  * newLeafTuple to its page.  We use this method when the chain contains
384  * very little data so a split would be inefficient.  We are sure we can
385  * fit the chain plus newLeafTuple on one other page.
386  */
387 static void
388 moveLeafs(Relation index, SpGistState *state,
389                   SPPageDesc *current, SPPageDesc *parent,
390                   SpGistLeafTuple newLeafTuple, bool isNulls)
391 {
392         int                     i,
393                                 nDelete,
394                                 nInsert,
395                                 size;
396         Buffer          nbuf;
397         Page            npage;
398         SpGistLeafTuple it;
399         OffsetNumber r = InvalidOffsetNumber,
400                                 startOffset = InvalidOffsetNumber;
401         bool            replaceDead = false;
402         OffsetNumber *toDelete;
403         OffsetNumber *toInsert;
404         BlockNumber nblkno;
405         XLogRecData rdata[7];
406         spgxlogMoveLeafs xlrec;
407         char       *leafdata,
408                            *leafptr;
409
410         /* This doesn't work on root page */
411         Assert(parent->buffer != InvalidBuffer);
412         Assert(parent->buffer != current->buffer);
413
414         /* Locate the tuples to be moved, and count up the space needed */
415         i = PageGetMaxOffsetNumber(current->page);
416         toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
417         toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
418
419         size = newLeafTuple->size + sizeof(ItemIdData);
420
421         nDelete = 0;
422         i = current->offnum;
423         while (i != InvalidOffsetNumber)
424         {
425                 SpGistLeafTuple it;
426
427                 Assert(i >= FirstOffsetNumber &&
428                            i <= PageGetMaxOffsetNumber(current->page));
429                 it = (SpGistLeafTuple) PageGetItem(current->page,
430                                                                                    PageGetItemId(current->page, i));
431
432                 if (it->tupstate == SPGIST_LIVE)
433                 {
434                         toDelete[nDelete] = i;
435                         size += it->size + sizeof(ItemIdData);
436                         nDelete++;
437                 }
438                 else if (it->tupstate == SPGIST_DEAD)
439                 {
440                         /* We could see a DEAD tuple as first/only chain item */
441                         Assert(i == current->offnum);
442                         Assert(it->nextOffset == InvalidOffsetNumber);
443                         /* We don't want to move it, so don't count it in size */
444                         toDelete[nDelete] = i;
445                         nDelete++;
446                         replaceDead = true;
447                 }
448                 else
449                         elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
450
451                 i = it->nextOffset;
452         }
453
454         /* Find a leaf page that will hold them */
455         nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
456                                                    size, &xlrec.newPage);
457         npage = BufferGetPage(nbuf);
458         nblkno = BufferGetBlockNumber(nbuf);
459         Assert(nblkno != current->blkno);
460
461         /* prepare WAL info */
462         xlrec.node = index->rd_node;
463         STORE_STATE(state, xlrec.stateSrc);
464
465         xlrec.blknoSrc = current->blkno;
466         xlrec.blknoDst = nblkno;
467         xlrec.nMoves = nDelete;
468         xlrec.replaceDead = replaceDead;
469         xlrec.storesNulls = isNulls;
470
471         xlrec.blknoParent = parent->blkno;
472         xlrec.offnumParent = parent->offnum;
473         xlrec.nodeI = parent->node;
474
475         leafdata = leafptr = palloc(size);
476
477         START_CRIT_SECTION();
478
479         /* copy all the old tuples to new page, unless they're dead */
480         nInsert = 0;
481         if (!replaceDead)
482         {
483                 for (i = 0; i < nDelete; i++)
484                 {
485                         it = (SpGistLeafTuple) PageGetItem(current->page,
486                                                                                            PageGetItemId(current->page, toDelete[i]));
487                         Assert(it->tupstate == SPGIST_LIVE);
488
489                         /*
490                          * Update chain link (notice the chain order gets reversed, but we
491                          * don't care).  We're modifying the tuple on the source page
492                          * here, but it's okay since we're about to delete it.
493                          */
494                         it->nextOffset = r;
495
496                         r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
497                                                                          &startOffset, false);
498
499                         toInsert[nInsert] = r;
500                         nInsert++;
501
502                         /* save modified tuple into leafdata as well */
503                         memcpy(leafptr, it, it->size);
504                         leafptr += it->size;
505                 }
506         }
507
508         /* add the new tuple as well */
509         newLeafTuple->nextOffset = r;
510         r = SpGistPageAddNewItem(state, npage,
511                                                          (Item) newLeafTuple, newLeafTuple->size,
512                                                          &startOffset, false);
513         toInsert[nInsert] = r;
514         nInsert++;
515         memcpy(leafptr, newLeafTuple, newLeafTuple->size);
516         leafptr += newLeafTuple->size;
517
518         /*
519          * Now delete the old tuples, leaving a redirection pointer behind for
520          * the first one, unless we're doing an index build; in which case there
521          * can't be any concurrent scan so we need not provide a redirect.
522          */
523         spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
524                                                         state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
525                                                         SPGIST_PLACEHOLDER,
526                                                         nblkno, r);
527
528         /* Update parent's downlink and mark parent page dirty */
529         saveNodeLink(index, parent, nblkno, r);
530
531         /* Mark the leaf pages too */
532         MarkBufferDirty(current->buffer);
533         MarkBufferDirty(nbuf);
534
535         if (RelationNeedsWAL(index))
536         {
537                 XLogRecPtr      recptr;
538
539                 ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
540                 ACCEPT_RDATA_DATA(toDelete, MAXALIGN(sizeof(OffsetNumber) * nDelete), 1);
541                 ACCEPT_RDATA_DATA(toInsert, MAXALIGN(sizeof(OffsetNumber) * nInsert), 2);
542                 ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3);
543                 ACCEPT_RDATA_BUFFER(current->buffer, 4);
544                 ACCEPT_RDATA_BUFFER(nbuf, 5);
545                 ACCEPT_RDATA_BUFFER(parent->buffer, 6);
546
547                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
548
549                 PageSetLSN(current->page, recptr);
550                 PageSetTLI(current->page, ThisTimeLineID);
551                 PageSetLSN(npage, recptr);
552                 PageSetTLI(npage, ThisTimeLineID);
553                 PageSetLSN(parent->page, recptr);
554                 PageSetTLI(parent->page, ThisTimeLineID);
555         }
556
557         END_CRIT_SECTION();
558
559         /* Update local free-space cache and release new buffer */
560         SpGistSetLastUsedPage(index, nbuf);
561         UnlockReleaseBuffer(nbuf);
562 }
563
564 /*
565  * Update previously-created redirection tuple with appropriate destination
566  *
567  * We use this when it's not convenient to know the destination first.
568  * The tuple should have been made with the "impossible" destination of
569  * the metapage.
570  */
571 static void
572 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
573                                         BlockNumber blkno, OffsetNumber offnum)
574 {
575         SpGistDeadTuple dt;
576
577         dt = (SpGistDeadTuple) PageGetItem(current->page,
578                                                                            PageGetItemId(current->page, position));
579         Assert(dt->tupstate == SPGIST_REDIRECT);
580         Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
581         ItemPointerSet(&dt->pointer, blkno, offnum);
582 }
583
584 /*
585  * Test to see if the user-defined picksplit function failed to do its job,
586  * ie, it put all the leaf tuples into the same node.
587  * If so, randomly divide the tuples into several nodes (all with the same
588  * label) and return TRUE to select allTheSame mode for this inner tuple.
589  *
590  * (This code is also used to forcibly select allTheSame mode for nulls.)
591  *
592  * If we know that the leaf tuples wouldn't all fit on one page, then we
593  * exclude the last tuple (which is the incoming new tuple that forced a split)
594  * from the check to see if more than one node is used.  The reason for this
595  * is that if the existing tuples are put into only one chain, then even if
596  * we move them all to an empty page, there would still not be room for the
597  * new tuple, so we'd get into an infinite loop of picksplit attempts.
598  * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
599  * be split across pages.  (Exercise for the reader: figure out why this
600  * fixes the problem even when there is only one old tuple.)
601  */
602 static bool
603 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
604                                 bool *includeNew)
605 {
606         int                     theNode;
607         int                     limit;
608         int                     i;
609
610         /* For the moment, assume we can include the new leaf tuple */
611         *includeNew = true;
612
613         /* If there's only the new leaf tuple, don't select allTheSame mode */
614         if (in->nTuples <= 1)
615                 return false;
616
617         /* If tuple set doesn't fit on one page, ignore the new tuple in test */
618         limit = tooBig ? in->nTuples - 1 : in->nTuples;
619
620         /* Check to see if more than one node is populated */
621         theNode = out->mapTuplesToNodes[0];
622         for (i = 1; i < limit; i++)
623         {
624                 if (out->mapTuplesToNodes[i] != theNode)
625                         return false;
626         }
627
628         /* Nope, so override the picksplit function's decisions */
629
630         /* If the new tuple is in its own node, it can't be included in split */
631         if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
632                 *includeNew = false;
633
634         out->nNodes = 8;                        /* arbitrary number of child nodes */
635
636         /* Random assignment of tuples to nodes (note we include new tuple) */
637         for (i = 0; i < in->nTuples; i++)
638                 out->mapTuplesToNodes[i] = i % out->nNodes;
639
640         /* The opclass may not use node labels, but if it does, duplicate 'em */
641         if (out->nodeLabels)
642         {
643                 Datum   theLabel = out->nodeLabels[theNode];
644
645                 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
646                 for (i = 0; i < out->nNodes; i++)
647                         out->nodeLabels[i] = theLabel;
648         }
649
650         /* We don't touch the prefix or the leaf tuple datum assignments */
651
652         return true;
653 }
654
655 /*
656  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
657  * but the chain has to be split because there's not enough room to add
658  * newLeafTuple to its page.
659  *
660  * This function splits the leaf tuple set according to picksplit's rules,
661  * creating one or more new chains that are spread across the current page
662  * and an additional leaf page (we assume that two leaf pages will be
663  * sufficient).  A new inner tuple is created, and the parent downlink
664  * pointer is updated to point to that inner tuple instead of the leaf chain.
665  *
666  * On exit, current contains the address of the new inner tuple.
667  *
668  * Returns true if we successfully inserted newLeafTuple during this function,
669  * false if caller still has to do it (meaning another picksplit operation is
670  * probably needed).  Failure could occur if the picksplit result is fairly
671  * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
672  * Because we force the picksplit result to be at least two chains, each
673  * cycle will get rid of at least one leaf tuple from the chain, so the loop
674  * will eventually terminate if lack of balance is the issue.  If the tuple
675  * is too big, we assume that repeated picksplit operations will eventually
676  * make it small enough by repeated prefix-stripping.  A broken opclass could
677  * make this an infinite loop, though.
678  */
679 static bool
680 doPickSplit(Relation index, SpGistState *state,
681                         SPPageDesc *current, SPPageDesc *parent,
682                         SpGistLeafTuple newLeafTuple,
683                         int level, bool isNulls, bool isNew)
684 {
685         bool            insertedNew = false;
686         spgPickSplitIn in;
687         spgPickSplitOut out;
688         FmgrInfo   *procinfo;
689         bool            includeNew;
690         int                     i,
691                                 max,
692                                 n;
693         SpGistInnerTuple innerTuple;
694         SpGistNodeTuple node,
695                            *nodes;
696         Buffer          newInnerBuffer,
697                                 newLeafBuffer;
698         ItemPointerData *heapPtrs;
699         uint8      *leafPageSelect;
700         int                *leafSizes;
701         OffsetNumber *toDelete;
702         OffsetNumber *toInsert;
703         OffsetNumber redirectTuplePos = InvalidOffsetNumber;
704         OffsetNumber startOffsets[2];
705         SpGistLeafTuple *newLeafs;
706         int                     spaceToDelete;
707         int                     currentFreeSpace;
708         int                     totalLeafSizes;
709         bool            allTheSame;
710         XLogRecData rdata[10];
711         int                     nRdata;
712         spgxlogPickSplit xlrec;
713         char       *leafdata,
714                            *leafptr;
715         SPPageDesc      saveCurrent;
716         int                     nToDelete,
717                                 nToInsert,
718                                 maxToInclude;
719
720         in.level = level;
721
722         /*
723          * Allocate per-leaf-tuple work arrays with max possible size
724          */
725         max = PageGetMaxOffsetNumber(current->page);
726         n = max + 1;
727         in.datums = (Datum *) palloc(sizeof(Datum) * n);
728         heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
729         toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
730         toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
731         newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
732         leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
733
734         xlrec.node = index->rd_node;
735         STORE_STATE(state, xlrec.stateSrc);
736
737         /*
738          * Form list of leaf tuples which will be distributed as split result;
739          * also, count up the amount of space that will be freed from current.
740          * (Note that in the non-root case, we won't actually delete the old
741          * tuples, only replace them with redirects or placeholders.)
742          *
743          * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
744          * page.  For a pass-by-value data type we will fetch a word that must
745          * exist even though it may contain garbage (because of the fact that leaf
746          * tuples must have size at least SGDTSIZE).  For a pass-by-reference type
747          * we are just computing a pointer that isn't going to get dereferenced.
748          * So it's not worth guarding the calls with isNulls checks.
749          */
750         nToInsert = 0;
751         nToDelete = 0;
752         spaceToDelete = 0;
753         if (SpGistBlockIsRoot(current->blkno))
754         {
755                 /*
756                  * We are splitting the root (which up to now is also a leaf page).
757                  * Its tuples are not linked, so scan sequentially to get them all.
758                  * We ignore the original value of current->offnum.
759                  */
760                 for (i = FirstOffsetNumber; i <= max; i++)
761                 {
762                         SpGistLeafTuple it;
763
764                         it = (SpGistLeafTuple) PageGetItem(current->page,
765                                                                                         PageGetItemId(current->page, i));
766                         if (it->tupstate == SPGIST_LIVE)
767                         {
768                                 in.datums[nToInsert] = SGLTDATUM(it, state);
769                                 heapPtrs[nToInsert] = it->heapPtr;
770                                 nToInsert++;
771                                 toDelete[nToDelete] = i;
772                                 nToDelete++;
773                                 /* we will delete the tuple altogether, so count full space */
774                                 spaceToDelete += it->size + sizeof(ItemIdData);
775                         }
776                         else                            /* tuples on root should be live */
777                                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
778                 }
779         }
780         else
781         {
782                 /* Normal case, just collect the leaf tuples in the chain */
783                 i = current->offnum;
784                 while (i != InvalidOffsetNumber)
785                 {
786                         SpGistLeafTuple it;
787
788                         Assert(i >= FirstOffsetNumber && i <= max);
789                         it = (SpGistLeafTuple) PageGetItem(current->page,
790                                                                                         PageGetItemId(current->page, i));
791                         if (it->tupstate == SPGIST_LIVE)
792                         {
793                                 in.datums[nToInsert] = SGLTDATUM(it, state);
794                                 heapPtrs[nToInsert] = it->heapPtr;
795                                 nToInsert++;
796                                 toDelete[nToDelete] = i;
797                                 nToDelete++;
798                                 /* we will not delete the tuple, only replace with dead */
799                                 Assert(it->size >= SGDTSIZE);
800                                 spaceToDelete += it->size - SGDTSIZE;
801                         }
802                         else if (it->tupstate == SPGIST_DEAD)
803                         {
804                                 /* We could see a DEAD tuple as first/only chain item */
805                                 Assert(i == current->offnum);
806                                 Assert(it->nextOffset == InvalidOffsetNumber);
807                                 toDelete[nToDelete] = i;
808                                 nToDelete++;
809                                 /* replacing it with redirect will save no space */
810                         }
811                         else
812                                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
813
814                         i = it->nextOffset;
815                 }
816         }
817         in.nTuples = nToInsert;
818
819         /*
820          * We may not actually insert new tuple because another picksplit may be
821          * necessary due to too large value, but we will try to to allocate enough
822          * space to include it; and in any case it has to be included in the input
823          * for the picksplit function.  So don't increment nToInsert yet.
824          */
825         in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
826         heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
827         in.nTuples++;
828
829         memset(&out, 0, sizeof(out));
830
831         if (!isNulls)
832         {
833                 /*
834                  * Perform split using user-defined method.
835                  */
836                 procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
837                 FunctionCall2Coll(procinfo,
838                                                   index->rd_indcollation[0],
839                                                   PointerGetDatum(&in),
840                                                   PointerGetDatum(&out));
841
842                 /*
843                  * Form new leaf tuples and count up the total space needed.
844                  */
845                 totalLeafSizes = 0;
846                 for (i = 0; i < in.nTuples; i++)
847                 {
848                         newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
849                                                                                    out.leafTupleDatums[i],
850                                                                                    false);
851                         totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
852                 }
853         }
854         else
855         {
856                 /*
857                  * Perform dummy split that puts all tuples into one node.
858                  * checkAllTheSame will override this and force allTheSame mode.
859                  */
860                 out.hasPrefix = false;
861                 out.nNodes = 1;
862                 out.nodeLabels = NULL;
863                 out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
864
865                 /*
866                  * Form new leaf tuples and count up the total space needed.
867                  */
868                 totalLeafSizes = 0;
869                 for (i = 0; i < in.nTuples; i++)
870                 {
871                         newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
872                                                                                    (Datum) 0,
873                                                                                    true);
874                         totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
875                 }
876         }
877
878         /*
879          * Check to see if the picksplit function failed to separate the values,
880          * ie, it put them all into the same child node.  If so, select allTheSame
881          * mode and create a random split instead.  See comments for
882          * checkAllTheSame as to why we need to know if the new leaf tuples could
883          * fit on one page.
884          */
885         allTheSame = checkAllTheSame(&in, &out,
886                                                                  totalLeafSizes > SPGIST_PAGE_CAPACITY,
887                                                                  &includeNew);
888
889         /*
890          * If checkAllTheSame decided we must exclude the new tuple, don't
891          * consider it any further.
892          */
893         if (includeNew)
894                 maxToInclude = in.nTuples;
895         else
896         {
897                 maxToInclude = in.nTuples - 1;
898                 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
899         }
900
901         /*
902          * Allocate per-node work arrays.  Since checkAllTheSame could replace
903          * out.nNodes with a value larger than the number of tuples on the input
904          * page, we can't allocate these arrays before here.
905          */
906         nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
907         leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
908
909         /*
910          * Form nodes of inner tuple and inner tuple itself
911          */
912         for (i = 0; i < out.nNodes; i++)
913         {
914                 Datum           label = (Datum) 0;
915                 bool            labelisnull = (out.nodeLabels == NULL);
916
917                 if (!labelisnull)
918                         label = out.nodeLabels[i];
919                 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
920         }
921         innerTuple = spgFormInnerTuple(state,
922                                                                    out.hasPrefix, out.prefixDatum,
923                                                                    out.nNodes, nodes);
924         innerTuple->allTheSame = allTheSame;
925
926         /*
927          * Update nodes[] array to point into the newly formed innerTuple, so
928          * that we can adjust their downlinks below.
929          */
930         SGITITERATE(innerTuple, i, node)
931         {
932                 nodes[i] = node;
933         }
934
935         /*
936          * Re-scan new leaf tuples and count up the space needed under each node.
937          */
938         for (i = 0; i < maxToInclude; i++)
939         {
940                 n = out.mapTuplesToNodes[i];
941                 if (n < 0 || n >= out.nNodes)
942                         elog(ERROR, "inconsistent result of SPGiST picksplit function");
943                 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
944         }
945
946         /*
947          * To perform the split, we must insert a new inner tuple, which can't
948          * go on a leaf page; and unless we are splitting the root page, we
949          * must then update the parent tuple's downlink to point to the inner
950          * tuple.  If there is room, we'll put the new inner tuple on the same
951          * page as the parent tuple, otherwise we need another non-leaf buffer.
952          * But if the parent page is the root, we can't add the new inner tuple
953          * there, because the root page must have only one inner tuple.
954          */
955         xlrec.initInner = false;
956         if (parent->buffer != InvalidBuffer &&
957                 !SpGistBlockIsRoot(parent->blkno) &&
958                 (SpGistPageGetFreeSpace(parent->page, 1) >=
959                  innerTuple->size + sizeof(ItemIdData)))
960         {
961                 /* New inner tuple will fit on parent page */
962                 newInnerBuffer = parent->buffer;
963         }
964         else if (parent->buffer != InvalidBuffer)
965         {
966                 /* Send tuple to page with next triple parity (see README) */
967                 newInnerBuffer = SpGistGetBuffer(index,
968                                                                                  GBUF_INNER_PARITY(parent->blkno + 1) |
969                                                                                  (isNulls ? GBUF_NULLS : 0),
970                                                                                  innerTuple->size + sizeof(ItemIdData),
971                                                                                  &xlrec.initInner);
972         }
973         else
974         {
975                 /* Root page split ... inner tuple will go to root page */
976                 newInnerBuffer = InvalidBuffer;
977         }
978
979         /*
980          * Because a WAL record can't involve more than four buffers, we can
981          * only afford to deal with two leaf pages in each picksplit action,
982          * ie the current page and at most one other.
983          *
984          * The new leaf tuples converted from the existing ones should require
985          * the same or less space, and therefore should all fit onto one page
986          * (although that's not necessarily the current page, since we can't
987          * delete the old tuples but only replace them with placeholders).
988          * However, the incoming new tuple might not also fit, in which case
989          * we might need another picksplit cycle to reduce it some more.
990          *
991          * If there's not room to put everything back onto the current page,
992          * then we decide on a per-node basis which tuples go to the new page.
993          * (We do it like that because leaf tuple chains can't cross pages,
994          * so we must place all leaf tuples belonging to the same parent node
995          * on the same page.)
996          *
997          * If we are splitting the root page (turning it from a leaf page into an
998          * inner page), then no leaf tuples can go back to the current page; they
999          * must all go somewhere else.
1000          */
1001         if (!SpGistBlockIsRoot(current->blkno))
1002                 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1003         else
1004                 currentFreeSpace = 0;   /* prevent assigning any tuples to current */
1005
1006         xlrec.initDest = false;
1007
1008         if (totalLeafSizes <= currentFreeSpace)
1009         {
1010                 /* All the leaf tuples will fit on current page */
1011                 newLeafBuffer = InvalidBuffer;
1012                 /* mark new leaf tuple as included in insertions, if allowed */
1013                 if (includeNew)
1014                 {
1015                         nToInsert++;
1016                         insertedNew = true;
1017                 }
1018                 for (i = 0; i < nToInsert; i++)
1019                         leafPageSelect[i] = 0;          /* signifies current page */
1020         }
1021         else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1022         {
1023                 /*
1024                  * We're trying to split up a long value by repeated suffixing, but
1025                  * it's not going to fit yet.  Don't bother allocating a second leaf
1026                  * buffer that we won't be able to use.
1027                  */
1028                 newLeafBuffer = InvalidBuffer;
1029                 Assert(includeNew);
1030                 Assert(nToInsert == 0);
1031         }
1032         else
1033         {
1034                 /* We will need another leaf page */
1035                 uint8      *nodePageSelect;
1036                 int                     curspace;
1037                 int                     newspace;
1038
1039                 newLeafBuffer = SpGistGetBuffer(index,
1040                                                                                 GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1041                                                                                 Min(totalLeafSizes,
1042                                                                                         SPGIST_PAGE_CAPACITY),
1043                                                                                 &xlrec.initDest);
1044                 /*
1045                  * Attempt to assign node groups to the two pages.  We might fail to
1046                  * do so, even if totalLeafSizes is less than the available space,
1047                  * because we can't split a group across pages.
1048                  */
1049                 nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1050
1051                 curspace = currentFreeSpace;
1052                 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1053                 for (i = 0; i < out.nNodes; i++)
1054                 {
1055                         if (leafSizes[i] <= curspace)
1056                         {
1057                                 nodePageSelect[i] = 0; /* signifies current page */
1058                                 curspace -= leafSizes[i];
1059                         }
1060                         else
1061                         {
1062                                 nodePageSelect[i] = 1; /* signifies new leaf page */
1063                                 newspace -= leafSizes[i];
1064                         }
1065                 }
1066                 if (curspace >= 0 && newspace >= 0)
1067                 {
1068                         /* Successful assignment, so we can include the new leaf tuple */
1069                         if (includeNew)
1070                         {
1071                                 nToInsert++;
1072                                 insertedNew = true;
1073                         }
1074                 }
1075                 else if (includeNew)
1076                 {
1077                         /* We must exclude the new leaf tuple from the split */
1078                         int             nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1079
1080                         leafSizes[nodeOfNewTuple] -=
1081                                 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1082
1083                         /* Repeat the node assignment process --- should succeed now */
1084                         curspace = currentFreeSpace;
1085                         newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1086                         for (i = 0; i < out.nNodes; i++)
1087                         {
1088                                 if (leafSizes[i] <= curspace)
1089                                 {
1090                                         nodePageSelect[i] = 0; /* signifies current page */
1091                                         curspace -= leafSizes[i];
1092                                 }
1093                                 else
1094                                 {
1095                                         nodePageSelect[i] = 1; /* signifies new leaf page */
1096                                         newspace -= leafSizes[i];
1097                                 }
1098                         }
1099                         if (curspace < 0 || newspace < 0)
1100                                 elog(ERROR, "failed to divide leaf tuple groups across pages");
1101                 }
1102                 else
1103                 {
1104                         /* oops, we already excluded new tuple ... should not get here */
1105                         elog(ERROR, "failed to divide leaf tuple groups across pages");
1106                 }
1107                 /* Expand the per-node assignments to be shown per leaf tuple */
1108                 for (i = 0; i < nToInsert; i++)
1109                 {
1110                         n = out.mapTuplesToNodes[i];
1111                         leafPageSelect[i] = nodePageSelect[n];
1112                 }
1113         }
1114
1115         /* Start preparing WAL record */
1116         xlrec.blknoSrc = current->blkno;
1117         xlrec.blknoDest = InvalidBlockNumber;
1118         xlrec.nDelete = 0;
1119         xlrec.initSrc = isNew;
1120         xlrec.storesNulls = isNulls;
1121
1122         leafdata = leafptr = (char *) palloc(totalLeafSizes);
1123
1124         ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
1125         ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, 1);
1126         nRdata = 2;
1127
1128         /* Here we begin making the changes to the target pages */
1129         START_CRIT_SECTION();
1130
1131         /*
1132          * Delete old leaf tuples from current buffer, except when we're splitting
1133          * the root; in that case there's no need because we'll re-init the page
1134          * below.  We do this first to make room for reinserting new leaf tuples.
1135          */
1136         if (!SpGistBlockIsRoot(current->blkno))
1137         {
1138                 /*
1139                  * Init buffer instead of deleting individual tuples, but only if
1140                  * there aren't any other live tuples and only during build; otherwise
1141                  * we need to set a redirection tuple for concurrent scans.
1142                  */
1143                 if (state->isBuild &&
1144                         nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1145                         PageGetMaxOffsetNumber(current->page))
1146                 {
1147                         SpGistInitBuffer(current->buffer,
1148                                                          SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1149                         xlrec.initSrc = true;
1150                 }
1151                 else if (isNew)
1152                 {
1153                         /* don't expose the freshly init'd buffer as a backup block */
1154                         Assert(nToDelete == 0);
1155                 }
1156                 else
1157                 {
1158                         xlrec.nDelete = nToDelete;
1159                         ACCEPT_RDATA_DATA(toDelete,
1160                                                           MAXALIGN(sizeof(OffsetNumber) * nToDelete),
1161                                                           nRdata);
1162                         nRdata++;
1163                         ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
1164                         nRdata++;
1165
1166                         if (!state->isBuild)
1167                         {
1168                                 /*
1169                                  * Need to create redirect tuple (it will point to new inner
1170                                  * tuple) but right now the new tuple's location is not known
1171                                  * yet.  So, set the redirection pointer to "impossible" value
1172                                  * and remember its position to update tuple later.
1173                                  */
1174                                 if (nToDelete > 0)
1175                                         redirectTuplePos = toDelete[0];
1176                                 spgPageIndexMultiDelete(state, current->page,
1177                                                                                 toDelete, nToDelete,
1178                                                                                 SPGIST_REDIRECT,
1179                                                                                 SPGIST_PLACEHOLDER,
1180                                                                                 SPGIST_METAPAGE_BLKNO,
1181                                                                                 FirstOffsetNumber);
1182                         }
1183                         else
1184                         {
1185                                 /*
1186                                  * During index build there is not concurrent searches, so we
1187                                  * don't need to create redirection tuple.
1188                                  */
1189                                 spgPageIndexMultiDelete(state, current->page,
1190                                                                                 toDelete, nToDelete,
1191                                                                                 SPGIST_PLACEHOLDER,
1192                                                                                 SPGIST_PLACEHOLDER,
1193                                                                                 InvalidBlockNumber,
1194                                                                                 InvalidOffsetNumber);
1195                         }
1196                 }
1197         }
1198
1199         /*
1200          * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1201          * nodes.
1202          */
1203         startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1204         for (i = 0; i < nToInsert; i++)
1205         {
1206                 SpGistLeafTuple it = newLeafs[i];
1207                 Buffer  leafBuffer;
1208                 BlockNumber leafBlock;
1209                 OffsetNumber newoffset;
1210
1211                 /* Which page is it going to? */
1212                 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1213                 leafBlock = BufferGetBlockNumber(leafBuffer);
1214
1215                 /* Link tuple into correct chain for its node */
1216                 n = out.mapTuplesToNodes[i];
1217
1218                 if (ItemPointerIsValid(&nodes[n]->t_tid))
1219                 {
1220                         Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1221                         it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1222                 }
1223                 else
1224                         it->nextOffset = InvalidOffsetNumber;
1225
1226                 /* Insert it on page */
1227                 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1228                                                                                  (Item) it, it->size,
1229                                                                                  &startOffsets[leafPageSelect[i]],
1230                                                                                  false);
1231                 toInsert[i] = newoffset;
1232
1233                 /* ... and complete the chain linking */
1234                 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1235
1236                 /* Also copy leaf tuple into WAL data */
1237                 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1238                 leafptr += newLeafs[i]->size;
1239         }
1240
1241         /*
1242          * We're done modifying the other leaf buffer (if any), so mark it dirty.
1243          * current->buffer will be marked below, after we're entirely done
1244          * modifying it.
1245          */
1246         if (newLeafBuffer != InvalidBuffer)
1247         {
1248                 MarkBufferDirty(newLeafBuffer);
1249                 /* also save block number for WAL */
1250                 xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer);
1251                 if (!xlrec.initDest)
1252                 {
1253                         ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata);
1254                         nRdata++;
1255                 }
1256         }
1257
1258         xlrec.nInsert = nToInsert;
1259         ACCEPT_RDATA_DATA(toInsert,
1260                                           MAXALIGN(sizeof(OffsetNumber) * nToInsert),
1261                                           nRdata);
1262         nRdata++;
1263         ACCEPT_RDATA_DATA(leafPageSelect,
1264                                           MAXALIGN(sizeof(uint8) * nToInsert),
1265                                           nRdata);
1266         nRdata++;
1267         ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata);
1268         nRdata++;
1269
1270         /* Remember current buffer, since we're about to change "current" */
1271         saveCurrent = *current;
1272
1273         /*
1274          * Store the new innerTuple
1275          */
1276         if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1277         {
1278                 /*
1279                  * new inner tuple goes to parent page
1280                  */
1281                 Assert(current->buffer != parent->buffer);
1282
1283                 /* Repoint "current" at the new inner tuple */
1284                 current->blkno = parent->blkno;
1285                 current->buffer = parent->buffer;
1286                 current->page = parent->page;
1287                 xlrec.blknoInner = current->blkno;
1288                 xlrec.offnumInner = current->offnum =
1289                         SpGistPageAddNewItem(state, current->page,
1290                                                                  (Item) innerTuple, innerTuple->size,
1291                                                                  NULL, false);
1292
1293                 /*
1294                  * Update parent node link and mark parent page dirty
1295                  */
1296                 xlrec.blknoParent = parent->blkno;
1297                 xlrec.offnumParent = parent->offnum;
1298                 xlrec.nodeI = parent->node;
1299                 saveNodeLink(index, parent, current->blkno, current->offnum);
1300
1301                 ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
1302                 nRdata++;
1303
1304                 /*
1305                  * Update redirection link (in old current buffer)
1306                  */
1307                 if (redirectTuplePos != InvalidOffsetNumber)
1308                         setRedirectionTuple(&saveCurrent, redirectTuplePos,
1309                                                                 current->blkno, current->offnum);
1310
1311                 /* Done modifying old current buffer, mark it dirty */
1312                 MarkBufferDirty(saveCurrent.buffer);
1313         }
1314         else if (parent->buffer != InvalidBuffer)
1315         {
1316                 /*
1317                  * new inner tuple will be stored on a new page
1318                  */
1319                 Assert(newInnerBuffer != InvalidBuffer);
1320
1321                 /* Repoint "current" at the new inner tuple */
1322                 current->buffer = newInnerBuffer;
1323                 current->blkno = BufferGetBlockNumber(current->buffer);
1324                 current->page = BufferGetPage(current->buffer);
1325                 xlrec.blknoInner = current->blkno;
1326                 xlrec.offnumInner = current->offnum =
1327                         SpGistPageAddNewItem(state, current->page,
1328                                                                  (Item) innerTuple, innerTuple->size,
1329                                                                  NULL, false);
1330
1331                 /* Done modifying new current buffer, mark it dirty */
1332                 MarkBufferDirty(current->buffer);
1333
1334                 /*
1335                  * Update parent node link and mark parent page dirty
1336                  */
1337                 xlrec.blknoParent = parent->blkno;
1338                 xlrec.offnumParent = parent->offnum;
1339                 xlrec.nodeI = parent->node;
1340                 saveNodeLink(index, parent, current->blkno, current->offnum);
1341
1342                 ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
1343                 nRdata++;
1344                 ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
1345                 nRdata++;
1346
1347                 /*
1348                  * Update redirection link (in old current buffer)
1349                  */
1350                 if (redirectTuplePos != InvalidOffsetNumber)
1351                         setRedirectionTuple(&saveCurrent, redirectTuplePos,
1352                                                                 current->blkno, current->offnum);
1353
1354                 /* Done modifying old current buffer, mark it dirty */
1355                 MarkBufferDirty(saveCurrent.buffer);
1356         }
1357         else
1358         {
1359                 /*
1360                  * Splitting root page, which was a leaf but now becomes inner page
1361                  * (and so "current" continues to point at it)
1362                  */
1363                 Assert(SpGistBlockIsRoot(current->blkno));
1364                 Assert(redirectTuplePos == InvalidOffsetNumber);
1365
1366                 SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1367                 xlrec.initInner = true;
1368
1369                 xlrec.blknoInner = current->blkno;
1370                 xlrec.offnumInner = current->offnum =
1371                         PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1372                                                 InvalidOffsetNumber, false, false);
1373                 if (current->offnum != FirstOffsetNumber)
1374                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
1375                                  innerTuple->size);
1376
1377                 /* No parent link to update, nor redirection to do */
1378                 xlrec.blknoParent = InvalidBlockNumber;
1379                 xlrec.offnumParent = InvalidOffsetNumber;
1380                 xlrec.nodeI = 0;
1381
1382                 /* Done modifying new current buffer, mark it dirty */
1383                 MarkBufferDirty(current->buffer);
1384
1385                 /* saveCurrent doesn't represent a different buffer */
1386                 saveCurrent.buffer = InvalidBuffer;
1387         }
1388
1389         if (RelationNeedsWAL(index))
1390         {
1391                 XLogRecPtr      recptr;
1392
1393                 /* Issue the WAL record */
1394                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata);
1395
1396                 /* Update page LSNs on all affected pages */
1397                 if (newLeafBuffer != InvalidBuffer)
1398                 {
1399                         Page            page = BufferGetPage(newLeafBuffer);
1400
1401                         PageSetLSN(page, recptr);
1402                         PageSetTLI(page, ThisTimeLineID);
1403                 }
1404
1405                 if (saveCurrent.buffer != InvalidBuffer)
1406                 {
1407                         Page            page = BufferGetPage(saveCurrent.buffer);
1408
1409                         PageSetLSN(page, recptr);
1410                         PageSetTLI(page, ThisTimeLineID);
1411                 }
1412
1413                 PageSetLSN(current->page, recptr);
1414                 PageSetTLI(current->page, ThisTimeLineID);
1415
1416                 if (parent->buffer != InvalidBuffer)
1417                 {
1418                         PageSetLSN(parent->page, recptr);
1419                         PageSetTLI(parent->page, ThisTimeLineID);
1420                 }
1421         }
1422
1423         END_CRIT_SECTION();
1424
1425         /* Update local free-space cache and unlock buffers */
1426         if (newLeafBuffer != InvalidBuffer)
1427         {
1428                 SpGistSetLastUsedPage(index, newLeafBuffer);
1429                 UnlockReleaseBuffer(newLeafBuffer);
1430         }
1431         if (saveCurrent.buffer != InvalidBuffer)
1432         {
1433                 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1434                 UnlockReleaseBuffer(saveCurrent.buffer);
1435         }
1436
1437         return insertedNew;
1438 }
1439
1440 /*
1441  * spgMatchNode action: descend to N'th child node of current inner tuple
1442  */
1443 static void
1444 spgMatchNodeAction(Relation index, SpGistState *state,
1445                                    SpGistInnerTuple innerTuple,
1446                                    SPPageDesc *current, SPPageDesc *parent, int nodeN)
1447 {
1448         int                     i;
1449         SpGistNodeTuple node;
1450
1451         /* Release previous parent buffer if any */
1452         if (parent->buffer != InvalidBuffer &&
1453                 parent->buffer != current->buffer)
1454         {
1455                 SpGistSetLastUsedPage(index, parent->buffer);
1456                 UnlockReleaseBuffer(parent->buffer);
1457         }
1458
1459         /* Repoint parent to specified node of current inner tuple */
1460         parent->blkno = current->blkno;
1461         parent->buffer = current->buffer;
1462         parent->page = current->page;
1463         parent->offnum = current->offnum;
1464         parent->node = nodeN;
1465
1466         /* Locate that node */
1467         SGITITERATE(innerTuple, i, node)
1468         {
1469                 if (i == nodeN)
1470                         break;
1471         }
1472
1473         if (i != nodeN)
1474                 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1475                          nodeN);
1476
1477         /* Point current to the downlink location, if any */
1478         if (ItemPointerIsValid(&node->t_tid))
1479         {
1480                 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1481                 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1482         }
1483         else
1484         {
1485                 /* Downlink is empty, so we'll need to find a new page */
1486                 current->blkno = InvalidBlockNumber;
1487                 current->offnum = InvalidOffsetNumber;
1488         }
1489
1490         current->buffer = InvalidBuffer;
1491         current->page = NULL;
1492 }
1493
1494 /*
1495  * spgAddNode action: add a node to the inner tuple at current
1496  */
1497 static void
1498 spgAddNodeAction(Relation index, SpGistState *state,
1499                                  SpGistInnerTuple innerTuple,
1500                                  SPPageDesc *current, SPPageDesc *parent,
1501                                  int nodeN, Datum nodeLabel)
1502 {
1503         SpGistInnerTuple newInnerTuple;
1504         XLogRecData rdata[5];
1505         spgxlogAddNode xlrec;
1506
1507         /* Should not be applied to nulls */
1508         Assert(!SpGistPageStoresNulls(current->page));
1509
1510         /* Construct new inner tuple with additional node */
1511         newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1512
1513         /* Prepare WAL record */
1514         xlrec.node = index->rd_node;
1515         STORE_STATE(state, xlrec.stateSrc);
1516         xlrec.blkno = current->blkno;
1517         xlrec.offnum = current->offnum;
1518
1519         /* we don't fill these unless we need to change the parent downlink */
1520         xlrec.blknoParent = InvalidBlockNumber;
1521         xlrec.offnumParent = InvalidOffsetNumber;
1522         xlrec.nodeI = 0;
1523
1524         /* we don't fill these unless tuple has to be moved */
1525         xlrec.blknoNew = InvalidBlockNumber;
1526         xlrec.offnumNew = InvalidOffsetNumber;
1527         xlrec.newPage = false;
1528
1529         ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
1530         /* we assume sizeof(xlrec) is at least int-aligned */
1531         ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1);
1532         ACCEPT_RDATA_BUFFER(current->buffer, 2);
1533
1534         if (PageGetExactFreeSpace(current->page) >=
1535                 newInnerTuple->size - innerTuple->size)
1536         {
1537                 /*
1538                  * We can replace the inner tuple by new version in-place
1539                  */
1540                 START_CRIT_SECTION();
1541
1542                 PageIndexTupleDelete(current->page, current->offnum);
1543                 if (PageAddItem(current->page,
1544                                                 (Item) newInnerTuple, newInnerTuple->size,
1545                                                 current->offnum, false, false) != current->offnum)
1546                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
1547                                  newInnerTuple->size);
1548
1549                 MarkBufferDirty(current->buffer);
1550
1551                 if (RelationNeedsWAL(index))
1552                 {
1553                         XLogRecPtr      recptr;
1554
1555                         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
1556
1557                         PageSetLSN(current->page, recptr);
1558                         PageSetTLI(current->page, ThisTimeLineID);
1559                 }
1560
1561                 END_CRIT_SECTION();
1562         }
1563         else
1564         {
1565                 /*
1566                  * move inner tuple to another page, and update parent
1567                  */
1568                 SpGistDeadTuple dt;
1569                 SPPageDesc      saveCurrent;
1570
1571                 /*
1572                  * It should not be possible to get here for the root page, since we
1573                  * allow only one inner tuple on the root page, and spgFormInnerTuple
1574                  * always checks that inner tuples don't exceed the size of a page.
1575                  */
1576                 if (SpGistBlockIsRoot(current->blkno))
1577                         elog(ERROR, "cannot enlarge root tuple any more");
1578                 Assert(parent->buffer != InvalidBuffer);
1579
1580                 saveCurrent = *current;
1581
1582                 xlrec.blknoParent = parent->blkno;
1583                 xlrec.offnumParent = parent->offnum;
1584                 xlrec.nodeI = parent->node;
1585
1586                 /*
1587                  * obtain new buffer with the same parity as current, since it will
1588                  * be a child of same parent tuple
1589                  */
1590                 current->buffer = SpGistGetBuffer(index,
1591                                                                                   GBUF_INNER_PARITY(current->blkno),
1592                                                                                   newInnerTuple->size + sizeof(ItemIdData),
1593                                                                                   &xlrec.newPage);
1594                 current->blkno = BufferGetBlockNumber(current->buffer);
1595                 current->page = BufferGetPage(current->buffer);
1596
1597                 xlrec.blknoNew = current->blkno;
1598
1599                 /*
1600                  * Let's just make real sure new current isn't same as old.  Right
1601                  * now that's impossible, but if SpGistGetBuffer ever got smart enough
1602                  * to delete placeholder tuples before checking space, maybe it
1603                  * wouldn't be impossible.  The case would appear to work except that
1604                  * WAL replay would be subtly wrong, so I think a mere assert isn't
1605                  * enough here.
1606                  */
1607                  if (xlrec.blknoNew == xlrec.blkno)
1608                          elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1609
1610                 /*
1611                  * New current and parent buffer will both be modified; but note that
1612                  * parent buffer could be same as either new or old current.
1613                  */
1614                 ACCEPT_RDATA_BUFFER(current->buffer, 3);
1615                 if (parent->buffer != current->buffer &&
1616                         parent->buffer != saveCurrent.buffer)
1617                         ACCEPT_RDATA_BUFFER(parent->buffer, 4);
1618
1619                 START_CRIT_SECTION();
1620
1621                 /* insert new ... */
1622                 xlrec.offnumNew = current->offnum =
1623                         SpGistPageAddNewItem(state, current->page,
1624                                                                  (Item) newInnerTuple, newInnerTuple->size,
1625                                                                  NULL, false);
1626
1627                 MarkBufferDirty(current->buffer);
1628
1629                 /* update parent's downlink and mark parent page dirty */
1630                 saveNodeLink(index, parent, current->blkno, current->offnum);
1631
1632                 /*
1633                  * Replace old tuple with a placeholder or redirection tuple.  Unless
1634                  * doing an index build, we have to insert a redirection tuple for
1635                  * possible concurrent scans.  We can't just delete it in any case,
1636                  * because that could change the offsets of other tuples on the page,
1637                  * breaking downlinks from their parents.
1638                  */
1639                 if (state->isBuild)
1640                         dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1641                                                                   InvalidBlockNumber, InvalidOffsetNumber);
1642                 else
1643                         dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1644                                                                   current->blkno, current->offnum);
1645
1646                 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1647                 if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1648                                                 saveCurrent.offnum,
1649                                                 false, false) != saveCurrent.offnum)
1650                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
1651                                  dt->size);
1652
1653                 if (state->isBuild)
1654                         SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1655                 else
1656                         SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1657
1658                 MarkBufferDirty(saveCurrent.buffer);
1659
1660                 if (RelationNeedsWAL(index))
1661                 {
1662                         XLogRecPtr      recptr;
1663
1664                         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
1665
1666                         /* we don't bother to check if any of these are redundant */
1667                         PageSetLSN(current->page, recptr);
1668                         PageSetTLI(current->page, ThisTimeLineID);
1669                         PageSetLSN(parent->page, recptr);
1670                         PageSetTLI(parent->page, ThisTimeLineID);
1671                         PageSetLSN(saveCurrent.page, recptr);
1672                         PageSetTLI(saveCurrent.page, ThisTimeLineID);
1673                 }
1674
1675                 END_CRIT_SECTION();
1676
1677                 /* Release saveCurrent if it's not same as current or parent */
1678                 if (saveCurrent.buffer != current->buffer &&
1679                         saveCurrent.buffer != parent->buffer)
1680                 {
1681                         SpGistSetLastUsedPage(index, saveCurrent.buffer);
1682                         UnlockReleaseBuffer(saveCurrent.buffer);
1683                 }
1684         }
1685 }
1686
1687 /*
1688  * spgSplitNode action: split inner tuple at current into prefix and postfix
1689  */
1690 static void
1691 spgSplitNodeAction(Relation index, SpGistState *state,
1692                                    SpGistInnerTuple innerTuple,
1693                                    SPPageDesc *current, spgChooseOut *out)
1694 {
1695         SpGistInnerTuple prefixTuple,
1696                                 postfixTuple;
1697         SpGistNodeTuple node,
1698                            *nodes;
1699         BlockNumber postfixBlkno;
1700         OffsetNumber postfixOffset;
1701         int                     i;
1702         XLogRecData rdata[5];
1703         spgxlogSplitTuple xlrec;
1704         Buffer          newBuffer = InvalidBuffer;
1705
1706         /* Should not be applied to nulls */
1707         Assert(!SpGistPageStoresNulls(current->page));
1708
1709         /*
1710          * Construct new prefix tuple, containing a single node with the
1711          * specified label.  (We'll update the node's downlink to point to the
1712          * new postfix tuple, below.)
1713          */
1714         node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false);
1715
1716         prefixTuple = spgFormInnerTuple(state,
1717                                                                         out->result.splitTuple.prefixHasPrefix,
1718                                                                         out->result.splitTuple.prefixPrefixDatum,
1719                                                                         1, &node);
1720
1721         /* it must fit in the space that innerTuple now occupies */
1722         if (prefixTuple->size > innerTuple->size)
1723                 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1724
1725         /*
1726          * Construct new postfix tuple, containing all nodes of innerTuple with
1727          * same node datums, but with the prefix specified by the picksplit
1728          * function.
1729          */
1730         nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1731         SGITITERATE(innerTuple, i, node)
1732         {
1733                 nodes[i] = node;
1734         }
1735
1736         postfixTuple = spgFormInnerTuple(state,
1737                                                                          out->result.splitTuple.postfixHasPrefix,
1738                                                                    out->result.splitTuple.postfixPrefixDatum,
1739                                                                          innerTuple->nNodes, nodes);
1740
1741         /* Postfix tuple is allTheSame if original tuple was */
1742         postfixTuple->allTheSame = innerTuple->allTheSame;
1743
1744         /* prep data for WAL record */
1745         xlrec.node = index->rd_node;
1746         xlrec.newPage = false;
1747
1748         ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
1749         /* we assume sizeof(xlrec) is at least int-aligned */
1750         ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1);
1751         ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2);
1752         ACCEPT_RDATA_BUFFER(current->buffer, 3);
1753
1754         /*
1755          * If we can't fit both tuples on the current page, get a new page for the
1756          * postfix tuple.  In particular, can't split to the root page.
1757          *
1758          * For the space calculation, note that prefixTuple replaces innerTuple
1759          * but postfixTuple will be a new entry.
1760          */
1761         if (SpGistBlockIsRoot(current->blkno) ||
1762                 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1763                 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1764         {
1765                 /*
1766                  * Choose page with next triple parity, because postfix tuple is a
1767                  * child of prefix one
1768                  */
1769                 newBuffer = SpGistGetBuffer(index,
1770                                                                         GBUF_INNER_PARITY(current->blkno + 1),
1771                                                                         postfixTuple->size + sizeof(ItemIdData),
1772                                                                         &xlrec.newPage);
1773                 ACCEPT_RDATA_BUFFER(newBuffer, 4);
1774         }
1775
1776         START_CRIT_SECTION();
1777
1778         /*
1779          * Replace old tuple by prefix tuple
1780          */
1781         PageIndexTupleDelete(current->page, current->offnum);
1782         xlrec.offnumPrefix = PageAddItem(current->page,
1783                                                                          (Item) prefixTuple, prefixTuple->size,
1784                                                                          current->offnum, false, false);
1785         if (xlrec.offnumPrefix != current->offnum)
1786                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1787                          prefixTuple->size);
1788         xlrec.blknoPrefix = current->blkno;
1789
1790         /*
1791          * put postfix tuple into appropriate page
1792          */
1793         if (newBuffer == InvalidBuffer)
1794         {
1795                 xlrec.blknoPostfix = postfixBlkno = current->blkno;
1796                 xlrec.offnumPostfix = postfixOffset =
1797                         SpGistPageAddNewItem(state, current->page,
1798                                                                  (Item) postfixTuple, postfixTuple->size,
1799                                                                  NULL, false);
1800         }
1801         else
1802         {
1803                 xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer);
1804                 xlrec.offnumPostfix = postfixOffset =
1805                         SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1806                                                                  (Item) postfixTuple, postfixTuple->size,
1807                                                                  NULL, false);
1808                 MarkBufferDirty(newBuffer);
1809         }
1810
1811         /*
1812          * And set downlink pointer in the prefix tuple to point to postfix tuple.
1813          * (We can't avoid this step by doing the above two steps in opposite
1814          * order, because there might not be enough space on the page to insert
1815          * the postfix tuple first.)  We have to update the local copy of the
1816          * prefixTuple too, because that's what will be written to WAL.
1817          */
1818         spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1819         prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1820                                                           PageGetItemId(current->page, current->offnum));
1821         spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1822
1823         MarkBufferDirty(current->buffer);
1824
1825         if (RelationNeedsWAL(index))
1826         {
1827                 XLogRecPtr      recptr;
1828
1829                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
1830
1831                 PageSetLSN(current->page, recptr);
1832                 PageSetTLI(current->page, ThisTimeLineID);
1833
1834                 if (newBuffer != InvalidBuffer)
1835                 {
1836                         PageSetLSN(BufferGetPage(newBuffer), recptr);
1837                         PageSetTLI(BufferGetPage(newBuffer), ThisTimeLineID);
1838                 }
1839         }
1840
1841         END_CRIT_SECTION();
1842
1843         /* Update local free-space cache and release buffer */
1844         if (newBuffer != InvalidBuffer)
1845         {
1846                 SpGistSetLastUsedPage(index, newBuffer);
1847                 UnlockReleaseBuffer(newBuffer);
1848         }
1849 }
1850
1851 /*
1852  * Insert one item into the index
1853  */
1854 void
1855 spgdoinsert(Relation index, SpGistState *state,
1856                         ItemPointer heapPtr, Datum datum, bool isnull)
1857 {
1858         int                     level = 0;
1859         Datum           leafDatum;
1860         int                     leafSize;
1861         SPPageDesc      current,
1862                                 parent;
1863
1864         /*
1865          * Since we don't use index_form_tuple in this AM, we have to make sure
1866          * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
1867          * that.
1868          */
1869         if (!isnull && state->attType.attlen == -1)
1870                 datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1871
1872         leafDatum = datum;
1873
1874         /*
1875          * Compute space needed for a leaf tuple containing the given datum.
1876          *
1877          * If it isn't gonna fit, and the opclass can't reduce the datum size by
1878          * suffixing, bail out now rather than getting into an endless loop.
1879          */
1880         if (!isnull)
1881                 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1882                         SpGistGetTypeSize(&state->attType, leafDatum);
1883         else
1884                 leafSize = SGDTSIZE + sizeof(ItemIdData);
1885
1886         if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
1887                 ereport(ERROR,
1888                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1889                         errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
1890                                    (unsigned long) (leafSize - sizeof(ItemIdData)),
1891                                    (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
1892                                    RelationGetRelationName(index)),
1893                   errhint("Values larger than a buffer page cannot be indexed.")));
1894
1895         /* Initialize "current" to the appropriate root page */
1896         current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
1897         current.buffer = InvalidBuffer;
1898         current.page = NULL;
1899         current.offnum = FirstOffsetNumber;
1900         current.node = -1;
1901
1902         /* "parent" is invalid for the moment */
1903         parent.blkno = InvalidBlockNumber;
1904         parent.buffer = InvalidBuffer;
1905         parent.page = NULL;
1906         parent.offnum = InvalidOffsetNumber;
1907         parent.node = -1;
1908
1909         for (;;)
1910         {
1911                 bool            isNew = false;
1912
1913                 /*
1914                  * Bail out if query cancel is pending.  We must have this somewhere
1915                  * in the loop since a broken opclass could produce an infinite
1916                  * picksplit loop.
1917                  */
1918                 CHECK_FOR_INTERRUPTS();
1919
1920                 if (current.blkno == InvalidBlockNumber)
1921                 {
1922                         /*
1923                          * Create a leaf page.  If leafSize is too large to fit on a page,
1924                          * we won't actually use the page yet, but it simplifies the API
1925                          * for doPickSplit to always have a leaf page at hand; so just
1926                          * quietly limit our request to a page size.
1927                          */
1928                         current.buffer =
1929                                 SpGistGetBuffer(index,
1930                                                                 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
1931                                                                 Min(leafSize, SPGIST_PAGE_CAPACITY),
1932                                                                 &isNew);
1933                         current.blkno = BufferGetBlockNumber(current.buffer);
1934                 }
1935                 else if (parent.buffer == InvalidBuffer ||
1936                                  current.blkno != parent.blkno)
1937                 {
1938                         current.buffer = ReadBuffer(index, current.blkno);
1939                         LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
1940                 }
1941                 else
1942                 {
1943                         /* inner tuple can be stored on the same page as parent one */
1944                         current.buffer = parent.buffer;
1945                 }
1946                 current.page = BufferGetPage(current.buffer);
1947
1948                 /* should not arrive at a page of the wrong type */
1949                 if (isnull ? !SpGistPageStoresNulls(current.page) :
1950                         SpGistPageStoresNulls(current.page))
1951                         elog(ERROR, "SPGiST index page %u has wrong nulls flag",
1952                                  current.blkno);
1953
1954                 if (SpGistPageIsLeaf(current.page))
1955                 {
1956                         SpGistLeafTuple leafTuple;
1957                         int                     nToSplit,
1958                                                 sizeToSplit;
1959
1960                         leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
1961                         if (leafTuple->size + sizeof(ItemIdData) <=
1962                                 SpGistPageGetFreeSpace(current.page, 1))
1963                         {
1964                                 /* it fits on page, so insert it and we're done */
1965                                 addLeafTuple(index, state, leafTuple,
1966                                                          &current, &parent, isnull, isNew);
1967                                 break;
1968                         }
1969                         else if ((sizeToSplit =
1970                                           checkSplitConditions(index, state, &current,
1971                                                                                    &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
1972                                          nToSplit < 64 &&
1973                                          leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
1974                         {
1975                                 /*
1976                                  * the amount of data is pretty small, so just move the whole
1977                                  * chain to another leaf page rather than splitting it.
1978                                  */
1979                                 Assert(!isNew);
1980                                 moveLeafs(index, state, &current, &parent, leafTuple, isnull);
1981                                 break;                  /* we're done */
1982                         }
1983                         else
1984                         {
1985                                 /* picksplit */
1986                                 if (doPickSplit(index, state, &current, &parent,
1987                                                                 leafTuple, level, isnull, isNew))
1988                                         break;          /* doPickSplit installed new tuples */
1989
1990                                 /* leaf tuple will not be inserted yet */
1991                                 pfree(leafTuple);
1992
1993                                 /*
1994                                  * current now describes new inner tuple, go insert into it
1995                                  */
1996                                 Assert(!SpGistPageIsLeaf(current.page));
1997                                 goto process_inner_tuple;
1998                         }
1999                 }
2000                 else    /* non-leaf page */
2001                 {
2002                         /*
2003                          * Apply the opclass choose function to figure out how to insert
2004                          * the given datum into the current inner tuple.
2005                          */
2006                         SpGistInnerTuple innerTuple;
2007                         spgChooseIn in;
2008                         spgChooseOut out;
2009                         FmgrInfo   *procinfo;
2010
2011                         /*
2012                          * spgAddNode and spgSplitTuple cases will loop back to here to
2013                          * complete the insertion operation.  Just in case the choose
2014                          * function is broken and produces add or split requests
2015                          * repeatedly, check for query cancel.
2016                          */
2017         process_inner_tuple:
2018                         CHECK_FOR_INTERRUPTS();
2019
2020                         innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2021                                                                 PageGetItemId(current.page, current.offnum));
2022
2023                         in.datum = datum;
2024                         in.leafDatum = leafDatum;
2025                         in.level = level;
2026                         in.allTheSame = innerTuple->allTheSame;
2027                         in.hasPrefix = (innerTuple->prefixSize > 0);
2028                         in.prefixDatum = SGITDATUM(innerTuple, state);
2029                         in.nNodes = innerTuple->nNodes;
2030                         in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2031
2032                         memset(&out, 0, sizeof(out));
2033
2034                         if (!isnull)
2035                         {
2036                                 /* use user-defined choose method */
2037                                 procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
2038                                 FunctionCall2Coll(procinfo,
2039                                                                   index->rd_indcollation[0],
2040                                                                   PointerGetDatum(&in),
2041                                                                   PointerGetDatum(&out));
2042                         }
2043                         else
2044                         {
2045                                 /* force "match" action (to insert to random subnode) */
2046                                 out.resultType = spgMatchNode;
2047                         }
2048
2049                         if (innerTuple->allTheSame)
2050                         {
2051                                 /*
2052                                  * It's not allowed to do an AddNode at an allTheSame tuple.
2053                                  * Opclass must say "match", in which case we choose a random
2054                                  * one of the nodes to descend into, or "split".
2055                                  */
2056                                 if (out.resultType == spgAddNode)
2057                                         elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2058                                 else if (out.resultType == spgMatchNode)
2059                                         out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2060                         }
2061
2062                         switch (out.resultType)
2063                         {
2064                                 case spgMatchNode:
2065                                         /* Descend to N'th child node */
2066                                         spgMatchNodeAction(index, state, innerTuple,
2067                                                                            &current, &parent,
2068                                                                            out.result.matchNode.nodeN);
2069                                         /* Adjust level as per opclass request */
2070                                         level += out.result.matchNode.levelAdd;
2071                                         /* Replace leafDatum and recompute leafSize */
2072                                         if (!isnull)
2073                                         {
2074                                                 leafDatum = out.result.matchNode.restDatum;
2075                                                 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2076                                                         SpGistGetTypeSize(&state->attType, leafDatum);
2077                                         }
2078
2079                                         /*
2080                                          * Loop around and attempt to insert the new leafDatum
2081                                          * at "current" (which might reference an existing child
2082                                          * tuple, or might be invalid to force us to find a new
2083                                          * page for the tuple).
2084                                          *
2085                                          * Note: if the opclass sets longValuesOK, we rely on the
2086                                          * choose function to eventually shorten the leafDatum
2087                                          * enough to fit on a page.  We could add a test here to
2088                                          * complain if the datum doesn't get visibly shorter each
2089                                          * time, but that could get in the way of opclasses that
2090                                          * "simplify" datums in a way that doesn't necessarily
2091                                          * lead to physical shortening on every cycle.
2092                                          */
2093                                         break;
2094                                 case spgAddNode:
2095                                         /* AddNode is not sensible if nodes don't have labels */
2096                                         if (in.nodeLabels == NULL)
2097                                                 elog(ERROR, "cannot add a node to an inner tuple without node labels");
2098                                         /* Add node to inner tuple, per request */
2099                                         spgAddNodeAction(index, state, innerTuple,
2100                                                                          &current, &parent,
2101                                                                          out.result.addNode.nodeN,
2102                                                                          out.result.addNode.nodeLabel);
2103
2104                                         /*
2105                                          * Retry insertion into the enlarged node.  We assume
2106                                          * that we'll get a MatchNode result this time.
2107                                          */
2108                                         goto process_inner_tuple;
2109                                         break;
2110                                 case spgSplitTuple:
2111                                         /* Split inner tuple, per request */
2112                                         spgSplitNodeAction(index, state, innerTuple,
2113                                                                            &current, &out);
2114
2115                                         /* Retry insertion into the split node */
2116                                         goto process_inner_tuple;
2117                                         break;
2118                                 default:
2119                                         elog(ERROR, "unrecognized SPGiST choose result: %d",
2120                                                  (int) out.resultType);
2121                                         break;
2122                         }
2123                 }
2124         }                                                       /* end loop */
2125
2126         /*
2127          * Release any buffers we're still holding.  Beware of possibility that
2128          * current and parent reference same buffer.
2129          */
2130         if (current.buffer != InvalidBuffer)
2131         {
2132                 SpGistSetLastUsedPage(index, current.buffer);
2133                 UnlockReleaseBuffer(current.buffer);
2134         }
2135         if (parent.buffer != InvalidBuffer &&
2136                 parent.buffer != current.buffer)
2137         {
2138                 SpGistSetLastUsedPage(index, parent.buffer);
2139                 UnlockReleaseBuffer(parent.buffer);
2140         }
2141 }