]> granicus.if.org Git - postgresql/blob - src/backend/access/spgist/spgdoinsert.c
Optimize SP-GiST insertions.
[postgresql] / src / backend / access / spgist / spgdoinsert.c
1 /*-------------------------------------------------------------------------
2  *
3  * spgdoinsert.c
4  *        implementation of insert algorithm
5  *
6  *
7  * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/spgist/spgdoinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "miscadmin.h"
21 #include "storage/bufmgr.h"
22
23
24 /*
25  * SPPageDesc tracks all info about a page we are inserting into.  In some
26  * situations it actually identifies a tuple, or even a specific node within
27  * an inner tuple.      But any of the fields can be invalid.  If the buffer
28  * field is valid, it implies we hold pin and exclusive lock on that buffer.
29  * page pointer should be valid exactly when buffer is.
30  */
31 typedef struct SPPageDesc
32 {
33         BlockNumber blkno;                      /* block number, or InvalidBlockNumber */
34         Buffer          buffer;                 /* page's buffer number, or InvalidBuffer */
35         Page            page;                   /* pointer to page buffer, or NULL */
36         OffsetNumber offnum;            /* offset of tuple, or InvalidOffsetNumber */
37         int                     node;                   /* node number within inner tuple, or -1 */
38 } SPPageDesc;
39
40
41 /*
42  * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
43  * is used to update the parent inner tuple's downlink after a move or
44  * split operation.
45  */
46 void
47 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
48                                   BlockNumber blkno, OffsetNumber offset)
49 {
50         int                     i;
51         SpGistNodeTuple node;
52
53         SGITITERATE(tup, i, node)
54         {
55                 if (i == nodeN)
56                 {
57                         ItemPointerSet(&node->t_tid, blkno, offset);
58                         return;
59                 }
60         }
61
62         elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
63                  nodeN);
64 }
65
66 /*
67  * Form a new inner tuple containing one more node than the given one, with
68  * the specified label datum, inserted at offset "offset" in the node array.
69  * The new tuple's prefix is the same as the old one's.
70  *
71  * Note that the new node initially has an invalid downlink.  We'll find a
72  * page to point it to later.
73  */
74 static SpGistInnerTuple
75 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
76 {
77         SpGistNodeTuple node,
78                            *nodes;
79         int                     i;
80
81         /* if offset is negative, insert at end */
82         if (offset < 0)
83                 offset = tuple->nNodes;
84         else if (offset > tuple->nNodes)
85                 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
86
87         nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
88         SGITITERATE(tuple, i, node)
89         {
90                 if (i < offset)
91                         nodes[i] = node;
92                 else
93                         nodes[i + 1] = node;
94         }
95
96         nodes[offset] = spgFormNodeTuple(state, label, false);
97
98         return spgFormInnerTuple(state,
99                                                          (tuple->prefixSize > 0),
100                                                          SGITDATUM(tuple, state),
101                                                          tuple->nNodes + 1,
102                                                          nodes);
103 }
104
105 /* qsort comparator for sorting OffsetNumbers */
106 static int
107 cmpOffsetNumbers(const void *a, const void *b)
108 {
109         if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
110                 return 0;
111         return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
112 }
113
114 /*
115  * Delete multiple tuples from an index page, preserving tuple offset numbers.
116  *
117  * The first tuple in the given list is replaced with a dead tuple of type
118  * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
119  * with dead tuples of type "reststate".  If either firststate or reststate
120  * is REDIRECT, blkno/offnum specify where to link to.
121  *
122  * NB: this is used during WAL replay, so beware of trying to make it too
123  * smart.  In particular, it shouldn't use "state" except for calling
124  * spgFormDeadTuple().
125  */
126 void
127 spgPageIndexMultiDelete(SpGistState *state, Page page,
128                                                 OffsetNumber *itemnos, int nitems,
129                                                 int firststate, int reststate,
130                                                 BlockNumber blkno, OffsetNumber offnum)
131 {
132         OffsetNumber firstItem;
133         OffsetNumber *sortednos;
134         SpGistDeadTuple tuple = NULL;
135         int                     i;
136
137         if (nitems == 0)
138                 return;                                 /* nothing to do */
139
140         /*
141          * For efficiency we want to use PageIndexMultiDelete, which requires the
142          * targets to be listed in sorted order, so we have to sort the itemnos
143          * array.  (This also greatly simplifies the math for reinserting the
144          * replacement tuples.)  However, we must not scribble on the caller's
145          * array, so we have to make a copy.
146          */
147         sortednos = (OffsetNumber *) palloc(sizeof(OffsetNumber) * nitems);
148         memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
149         if (nitems > 1)
150                 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
151
152         PageIndexMultiDelete(page, sortednos, nitems);
153
154         firstItem = itemnos[0];
155
156         for (i = 0; i < nitems; i++)
157         {
158                 OffsetNumber itemno = sortednos[i];
159                 int                     tupstate;
160
161                 tupstate = (itemno == firstItem) ? firststate : reststate;
162                 if (tuple == NULL || tuple->tupstate != tupstate)
163                         tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
164
165                 if (PageAddItem(page, (Item) tuple, tuple->size,
166                                                 itemno, false, false) != itemno)
167                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
168                                  tuple->size);
169
170                 if (tupstate == SPGIST_REDIRECT)
171                         SpGistPageGetOpaque(page)->nRedirection++;
172                 else if (tupstate == SPGIST_PLACEHOLDER)
173                         SpGistPageGetOpaque(page)->nPlaceholder++;
174         }
175
176         pfree(sortednos);
177 }
178
179 /*
180  * Update the parent inner tuple's downlink, and mark the parent buffer
181  * dirty (this must be the last change to the parent page in the current
182  * WAL action).
183  */
184 static void
185 saveNodeLink(Relation index, SPPageDesc *parent,
186                          BlockNumber blkno, OffsetNumber offnum)
187 {
188         SpGistInnerTuple innerTuple;
189
190         innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
191                                                                 PageGetItemId(parent->page, parent->offnum));
192
193         spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
194
195         MarkBufferDirty(parent->buffer);
196 }
197
198 /*
199  * Add a leaf tuple to a leaf page where there is known to be room for it
200  */
201 static void
202 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
203                    SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
204 {
205         XLogRecData rdata[4];
206         spgxlogAddLeaf xlrec;
207
208         xlrec.node = index->rd_node;
209         xlrec.blknoLeaf = current->blkno;
210         xlrec.newPage = isNew;
211         xlrec.storesNulls = isNulls;
212
213         /* these will be filled below as needed */
214         xlrec.offnumLeaf = InvalidOffsetNumber;
215         xlrec.offnumHeadLeaf = InvalidOffsetNumber;
216         xlrec.blknoParent = InvalidBlockNumber;
217         xlrec.offnumParent = InvalidOffsetNumber;
218         xlrec.nodeI = 0;
219
220         ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
221         /* we assume sizeof(xlrec) is at least int-aligned */
222         ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1);
223         ACCEPT_RDATA_BUFFER(current->buffer, 2);
224
225         START_CRIT_SECTION();
226
227         if (current->offnum == InvalidOffsetNumber ||
228                 SpGistBlockIsRoot(current->blkno))
229         {
230                 /* Tuple is not part of a chain */
231                 leafTuple->nextOffset = InvalidOffsetNumber;
232                 current->offnum = SpGistPageAddNewItem(state, current->page,
233                                                                                    (Item) leafTuple, leafTuple->size,
234                                                                                            NULL, false);
235
236                 xlrec.offnumLeaf = current->offnum;
237
238                 /* Must update parent's downlink if any */
239                 if (parent->buffer != InvalidBuffer)
240                 {
241                         xlrec.blknoParent = parent->blkno;
242                         xlrec.offnumParent = parent->offnum;
243                         xlrec.nodeI = parent->node;
244
245                         saveNodeLink(index, parent, current->blkno, current->offnum);
246
247                         ACCEPT_RDATA_BUFFER(parent->buffer, 3);
248                 }
249         }
250         else
251         {
252                 /*
253                  * Tuple must be inserted into existing chain.  We mustn't change the
254                  * chain's head address, but we don't need to chase the entire chain
255                  * to put the tuple at the end; we can insert it second.
256                  *
257                  * Also, it's possible that the "chain" consists only of a DEAD tuple,
258                  * in which case we should replace the DEAD tuple in-place.
259                  */
260                 SpGistLeafTuple head;
261                 OffsetNumber offnum;
262
263                 head = (SpGistLeafTuple) PageGetItem(current->page,
264                                                           PageGetItemId(current->page, current->offnum));
265                 if (head->tupstate == SPGIST_LIVE)
266                 {
267                         leafTuple->nextOffset = head->nextOffset;
268                         offnum = SpGistPageAddNewItem(state, current->page,
269                                                                                   (Item) leafTuple, leafTuple->size,
270                                                                                   NULL, false);
271
272                         /*
273                          * re-get head of list because it could have been moved on page,
274                          * and set new second element
275                          */
276                         head = (SpGistLeafTuple) PageGetItem(current->page,
277                                                           PageGetItemId(current->page, current->offnum));
278                         head->nextOffset = offnum;
279
280                         xlrec.offnumLeaf = offnum;
281                         xlrec.offnumHeadLeaf = current->offnum;
282                 }
283                 else if (head->tupstate == SPGIST_DEAD)
284                 {
285                         leafTuple->nextOffset = InvalidOffsetNumber;
286                         PageIndexTupleDelete(current->page, current->offnum);
287                         if (PageAddItem(current->page,
288                                                         (Item) leafTuple, leafTuple->size,
289                                                         current->offnum, false, false) != current->offnum)
290                                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
291                                          leafTuple->size);
292
293                         /* WAL replay distinguishes this case by equal offnums */
294                         xlrec.offnumLeaf = current->offnum;
295                         xlrec.offnumHeadLeaf = current->offnum;
296                 }
297                 else
298                         elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
299         }
300
301         MarkBufferDirty(current->buffer);
302
303         if (RelationNeedsWAL(index))
304         {
305                 XLogRecPtr      recptr;
306
307                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
308
309                 PageSetLSN(current->page, recptr);
310                 PageSetTLI(current->page, ThisTimeLineID);
311
312                 /* update parent only if we actually changed it */
313                 if (xlrec.blknoParent != InvalidBlockNumber)
314                 {
315                         PageSetLSN(parent->page, recptr);
316                         PageSetTLI(parent->page, ThisTimeLineID);
317                 }
318         }
319
320         END_CRIT_SECTION();
321 }
322
323 /*
324  * Count the number and total size of leaf tuples in the chain starting at
325  * current->offnum.  Return number into *nToSplit and total size as function
326  * result.
327  *
328  * Klugy special case when considering the root page (i.e., root is a leaf
329  * page, but we're about to split for the first time): return fake large
330  * values to force spgdoinsert() to take the doPickSplit rather than
331  * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
332  */
333 static int
334 checkSplitConditions(Relation index, SpGistState *state,
335                                          SPPageDesc *current, int *nToSplit)
336 {
337         int                     i,
338                                 n = 0,
339                                 totalSize = 0;
340
341         if (SpGistBlockIsRoot(current->blkno))
342         {
343                 /* return impossible values to force split */
344                 *nToSplit = BLCKSZ;
345                 return BLCKSZ;
346         }
347
348         i = current->offnum;
349         while (i != InvalidOffsetNumber)
350         {
351                 SpGistLeafTuple it;
352
353                 Assert(i >= FirstOffsetNumber &&
354                            i <= PageGetMaxOffsetNumber(current->page));
355                 it = (SpGistLeafTuple) PageGetItem(current->page,
356                                                                                    PageGetItemId(current->page, i));
357                 if (it->tupstate == SPGIST_LIVE)
358                 {
359                         n++;
360                         totalSize += it->size + sizeof(ItemIdData);
361                 }
362                 else if (it->tupstate == SPGIST_DEAD)
363                 {
364                         /* We could see a DEAD tuple as first/only chain item */
365                         Assert(i == current->offnum);
366                         Assert(it->nextOffset == InvalidOffsetNumber);
367                         /* Don't count it in result, because it won't go to other page */
368                 }
369                 else
370                         elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
371
372                 i = it->nextOffset;
373         }
374
375         *nToSplit = n;
376
377         return totalSize;
378 }
379
380 /*
381  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
382  * but the chain has to be moved because there's not enough room to add
383  * newLeafTuple to its page.  We use this method when the chain contains
384  * very little data so a split would be inefficient.  We are sure we can
385  * fit the chain plus newLeafTuple on one other page.
386  */
387 static void
388 moveLeafs(Relation index, SpGistState *state,
389                   SPPageDesc *current, SPPageDesc *parent,
390                   SpGistLeafTuple newLeafTuple, bool isNulls)
391 {
392         int                     i,
393                                 nDelete,
394                                 nInsert,
395                                 size;
396         Buffer          nbuf;
397         Page            npage;
398         SpGistLeafTuple it;
399         OffsetNumber r = InvalidOffsetNumber,
400                                 startOffset = InvalidOffsetNumber;
401         bool            replaceDead = false;
402         OffsetNumber *toDelete;
403         OffsetNumber *toInsert;
404         BlockNumber nblkno;
405         XLogRecData rdata[7];
406         spgxlogMoveLeafs xlrec;
407         char       *leafdata,
408                            *leafptr;
409
410         /* This doesn't work on root page */
411         Assert(parent->buffer != InvalidBuffer);
412         Assert(parent->buffer != current->buffer);
413
414         /* Locate the tuples to be moved, and count up the space needed */
415         i = PageGetMaxOffsetNumber(current->page);
416         toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
417         toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
418
419         size = newLeafTuple->size + sizeof(ItemIdData);
420
421         nDelete = 0;
422         i = current->offnum;
423         while (i != InvalidOffsetNumber)
424         {
425                 SpGistLeafTuple it;
426
427                 Assert(i >= FirstOffsetNumber &&
428                            i <= PageGetMaxOffsetNumber(current->page));
429                 it = (SpGistLeafTuple) PageGetItem(current->page,
430                                                                                    PageGetItemId(current->page, i));
431
432                 if (it->tupstate == SPGIST_LIVE)
433                 {
434                         toDelete[nDelete] = i;
435                         size += it->size + sizeof(ItemIdData);
436                         nDelete++;
437                 }
438                 else if (it->tupstate == SPGIST_DEAD)
439                 {
440                         /* We could see a DEAD tuple as first/only chain item */
441                         Assert(i == current->offnum);
442                         Assert(it->nextOffset == InvalidOffsetNumber);
443                         /* We don't want to move it, so don't count it in size */
444                         toDelete[nDelete] = i;
445                         nDelete++;
446                         replaceDead = true;
447                 }
448                 else
449                         elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
450
451                 i = it->nextOffset;
452         }
453
454         /* Find a leaf page that will hold them */
455         nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
456                                                    size, &xlrec.newPage);
457         npage = BufferGetPage(nbuf);
458         nblkno = BufferGetBlockNumber(nbuf);
459         Assert(nblkno != current->blkno);
460
461         /* prepare WAL info */
462         xlrec.node = index->rd_node;
463         STORE_STATE(state, xlrec.stateSrc);
464
465         xlrec.blknoSrc = current->blkno;
466         xlrec.blknoDst = nblkno;
467         xlrec.nMoves = nDelete;
468         xlrec.replaceDead = replaceDead;
469         xlrec.storesNulls = isNulls;
470
471         xlrec.blknoParent = parent->blkno;
472         xlrec.offnumParent = parent->offnum;
473         xlrec.nodeI = parent->node;
474
475         leafdata = leafptr = palloc(size);
476
477         START_CRIT_SECTION();
478
479         /* copy all the old tuples to new page, unless they're dead */
480         nInsert = 0;
481         if (!replaceDead)
482         {
483                 for (i = 0; i < nDelete; i++)
484                 {
485                         it = (SpGistLeafTuple) PageGetItem(current->page,
486                                                                   PageGetItemId(current->page, toDelete[i]));
487                         Assert(it->tupstate == SPGIST_LIVE);
488
489                         /*
490                          * Update chain link (notice the chain order gets reversed, but we
491                          * don't care).  We're modifying the tuple on the source page
492                          * here, but it's okay since we're about to delete it.
493                          */
494                         it->nextOffset = r;
495
496                         r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
497                                                                          &startOffset, false);
498
499                         toInsert[nInsert] = r;
500                         nInsert++;
501
502                         /* save modified tuple into leafdata as well */
503                         memcpy(leafptr, it, it->size);
504                         leafptr += it->size;
505                 }
506         }
507
508         /* add the new tuple as well */
509         newLeafTuple->nextOffset = r;
510         r = SpGistPageAddNewItem(state, npage,
511                                                          (Item) newLeafTuple, newLeafTuple->size,
512                                                          &startOffset, false);
513         toInsert[nInsert] = r;
514         nInsert++;
515         memcpy(leafptr, newLeafTuple, newLeafTuple->size);
516         leafptr += newLeafTuple->size;
517
518         /*
519          * Now delete the old tuples, leaving a redirection pointer behind for the
520          * first one, unless we're doing an index build; in which case there can't
521          * be any concurrent scan so we need not provide a redirect.
522          */
523         spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
524                                            state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
525                                                         SPGIST_PLACEHOLDER,
526                                                         nblkno, r);
527
528         /* Update parent's downlink and mark parent page dirty */
529         saveNodeLink(index, parent, nblkno, r);
530
531         /* Mark the leaf pages too */
532         MarkBufferDirty(current->buffer);
533         MarkBufferDirty(nbuf);
534
535         if (RelationNeedsWAL(index))
536         {
537                 XLogRecPtr      recptr;
538
539                 ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
540                 ACCEPT_RDATA_DATA(toDelete, MAXALIGN(sizeof(OffsetNumber) * nDelete), 1);
541                 ACCEPT_RDATA_DATA(toInsert, MAXALIGN(sizeof(OffsetNumber) * nInsert), 2);
542                 ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3);
543                 ACCEPT_RDATA_BUFFER(current->buffer, 4);
544                 ACCEPT_RDATA_BUFFER(nbuf, 5);
545                 ACCEPT_RDATA_BUFFER(parent->buffer, 6);
546
547                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
548
549                 PageSetLSN(current->page, recptr);
550                 PageSetTLI(current->page, ThisTimeLineID);
551                 PageSetLSN(npage, recptr);
552                 PageSetTLI(npage, ThisTimeLineID);
553                 PageSetLSN(parent->page, recptr);
554                 PageSetTLI(parent->page, ThisTimeLineID);
555         }
556
557         END_CRIT_SECTION();
558
559         /* Update local free-space cache and release new buffer */
560         SpGistSetLastUsedPage(index, nbuf);
561         UnlockReleaseBuffer(nbuf);
562 }
563
564 /*
565  * Update previously-created redirection tuple with appropriate destination
566  *
567  * We use this when it's not convenient to know the destination first.
568  * The tuple should have been made with the "impossible" destination of
569  * the metapage.
570  */
571 static void
572 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
573                                         BlockNumber blkno, OffsetNumber offnum)
574 {
575         SpGistDeadTuple dt;
576
577         dt = (SpGistDeadTuple) PageGetItem(current->page,
578                                                                          PageGetItemId(current->page, position));
579         Assert(dt->tupstate == SPGIST_REDIRECT);
580         Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
581         ItemPointerSet(&dt->pointer, blkno, offnum);
582 }
583
584 /*
585  * Test to see if the user-defined picksplit function failed to do its job,
586  * ie, it put all the leaf tuples into the same node.
587  * If so, randomly divide the tuples into several nodes (all with the same
588  * label) and return TRUE to select allTheSame mode for this inner tuple.
589  *
590  * (This code is also used to forcibly select allTheSame mode for nulls.)
591  *
592  * If we know that the leaf tuples wouldn't all fit on one page, then we
593  * exclude the last tuple (which is the incoming new tuple that forced a split)
594  * from the check to see if more than one node is used.  The reason for this
595  * is that if the existing tuples are put into only one chain, then even if
596  * we move them all to an empty page, there would still not be room for the
597  * new tuple, so we'd get into an infinite loop of picksplit attempts.
598  * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
599  * be split across pages.  (Exercise for the reader: figure out why this
600  * fixes the problem even when there is only one old tuple.)
601  */
602 static bool
603 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
604                                 bool *includeNew)
605 {
606         int                     theNode;
607         int                     limit;
608         int                     i;
609
610         /* For the moment, assume we can include the new leaf tuple */
611         *includeNew = true;
612
613         /* If there's only the new leaf tuple, don't select allTheSame mode */
614         if (in->nTuples <= 1)
615                 return false;
616
617         /* If tuple set doesn't fit on one page, ignore the new tuple in test */
618         limit = tooBig ? in->nTuples - 1 : in->nTuples;
619
620         /* Check to see if more than one node is populated */
621         theNode = out->mapTuplesToNodes[0];
622         for (i = 1; i < limit; i++)
623         {
624                 if (out->mapTuplesToNodes[i] != theNode)
625                         return false;
626         }
627
628         /* Nope, so override the picksplit function's decisions */
629
630         /* If the new tuple is in its own node, it can't be included in split */
631         if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
632                 *includeNew = false;
633
634         out->nNodes = 8;                        /* arbitrary number of child nodes */
635
636         /* Random assignment of tuples to nodes (note we include new tuple) */
637         for (i = 0; i < in->nTuples; i++)
638                 out->mapTuplesToNodes[i] = i % out->nNodes;
639
640         /* The opclass may not use node labels, but if it does, duplicate 'em */
641         if (out->nodeLabels)
642         {
643                 Datum           theLabel = out->nodeLabels[theNode];
644
645                 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
646                 for (i = 0; i < out->nNodes; i++)
647                         out->nodeLabels[i] = theLabel;
648         }
649
650         /* We don't touch the prefix or the leaf tuple datum assignments */
651
652         return true;
653 }
654
655 /*
656  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
657  * but the chain has to be split because there's not enough room to add
658  * newLeafTuple to its page.
659  *
660  * This function splits the leaf tuple set according to picksplit's rules,
661  * creating one or more new chains that are spread across the current page
662  * and an additional leaf page (we assume that two leaf pages will be
663  * sufficient).  A new inner tuple is created, and the parent downlink
664  * pointer is updated to point to that inner tuple instead of the leaf chain.
665  *
666  * On exit, current contains the address of the new inner tuple.
667  *
668  * Returns true if we successfully inserted newLeafTuple during this function,
669  * false if caller still has to do it (meaning another picksplit operation is
670  * probably needed).  Failure could occur if the picksplit result is fairly
671  * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
672  * Because we force the picksplit result to be at least two chains, each
673  * cycle will get rid of at least one leaf tuple from the chain, so the loop
674  * will eventually terminate if lack of balance is the issue.  If the tuple
675  * is too big, we assume that repeated picksplit operations will eventually
676  * make it small enough by repeated prefix-stripping.  A broken opclass could
677  * make this an infinite loop, though.
678  */
679 static bool
680 doPickSplit(Relation index, SpGistState *state,
681                         SPPageDesc *current, SPPageDesc *parent,
682                         SpGistLeafTuple newLeafTuple,
683                         int level, bool isNulls, bool isNew)
684 {
685         bool            insertedNew = false;
686         spgPickSplitIn in;
687         spgPickSplitOut out;
688         FmgrInfo   *procinfo;
689         bool            includeNew;
690         int                     i,
691                                 max,
692                                 n;
693         SpGistInnerTuple innerTuple;
694         SpGistNodeTuple node,
695                            *nodes;
696         Buffer          newInnerBuffer,
697                                 newLeafBuffer;
698         ItemPointerData *heapPtrs;
699         uint8      *leafPageSelect;
700         int                *leafSizes;
701         OffsetNumber *toDelete;
702         OffsetNumber *toInsert;
703         OffsetNumber redirectTuplePos = InvalidOffsetNumber;
704         OffsetNumber startOffsets[2];
705         SpGistLeafTuple *newLeafs;
706         int                     spaceToDelete;
707         int                     currentFreeSpace;
708         int                     totalLeafSizes;
709         bool            allTheSame;
710         XLogRecData rdata[10];
711         int                     nRdata;
712         spgxlogPickSplit xlrec;
713         char       *leafdata,
714                            *leafptr;
715         SPPageDesc      saveCurrent;
716         int                     nToDelete,
717                                 nToInsert,
718                                 maxToInclude;
719
720         in.level = level;
721
722         /*
723          * Allocate per-leaf-tuple work arrays with max possible size
724          */
725         max = PageGetMaxOffsetNumber(current->page);
726         n = max + 1;
727         in.datums = (Datum *) palloc(sizeof(Datum) * n);
728         heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
729         toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
730         toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
731         newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
732         leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
733
734         xlrec.node = index->rd_node;
735         STORE_STATE(state, xlrec.stateSrc);
736
737         /*
738          * Form list of leaf tuples which will be distributed as split result;
739          * also, count up the amount of space that will be freed from current.
740          * (Note that in the non-root case, we won't actually delete the old
741          * tuples, only replace them with redirects or placeholders.)
742          *
743          * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
744          * page.  For a pass-by-value data type we will fetch a word that must
745          * exist even though it may contain garbage (because of the fact that leaf
746          * tuples must have size at least SGDTSIZE).  For a pass-by-reference type
747          * we are just computing a pointer that isn't going to get dereferenced.
748          * So it's not worth guarding the calls with isNulls checks.
749          */
750         nToInsert = 0;
751         nToDelete = 0;
752         spaceToDelete = 0;
753         if (SpGistBlockIsRoot(current->blkno))
754         {
755                 /*
756                  * We are splitting the root (which up to now is also a leaf page).
757                  * Its tuples are not linked, so scan sequentially to get them all. We
758                  * ignore the original value of current->offnum.
759                  */
760                 for (i = FirstOffsetNumber; i <= max; i++)
761                 {
762                         SpGistLeafTuple it;
763
764                         it = (SpGistLeafTuple) PageGetItem(current->page,
765                                                                                         PageGetItemId(current->page, i));
766                         if (it->tupstate == SPGIST_LIVE)
767                         {
768                                 in.datums[nToInsert] = SGLTDATUM(it, state);
769                                 heapPtrs[nToInsert] = it->heapPtr;
770                                 nToInsert++;
771                                 toDelete[nToDelete] = i;
772                                 nToDelete++;
773                                 /* we will delete the tuple altogether, so count full space */
774                                 spaceToDelete += it->size + sizeof(ItemIdData);
775                         }
776                         else    /* tuples on root should be live */
777                                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
778                 }
779         }
780         else
781         {
782                 /* Normal case, just collect the leaf tuples in the chain */
783                 i = current->offnum;
784                 while (i != InvalidOffsetNumber)
785                 {
786                         SpGistLeafTuple it;
787
788                         Assert(i >= FirstOffsetNumber && i <= max);
789                         it = (SpGistLeafTuple) PageGetItem(current->page,
790                                                                                         PageGetItemId(current->page, i));
791                         if (it->tupstate == SPGIST_LIVE)
792                         {
793                                 in.datums[nToInsert] = SGLTDATUM(it, state);
794                                 heapPtrs[nToInsert] = it->heapPtr;
795                                 nToInsert++;
796                                 toDelete[nToDelete] = i;
797                                 nToDelete++;
798                                 /* we will not delete the tuple, only replace with dead */
799                                 Assert(it->size >= SGDTSIZE);
800                                 spaceToDelete += it->size - SGDTSIZE;
801                         }
802                         else if (it->tupstate == SPGIST_DEAD)
803                         {
804                                 /* We could see a DEAD tuple as first/only chain item */
805                                 Assert(i == current->offnum);
806                                 Assert(it->nextOffset == InvalidOffsetNumber);
807                                 toDelete[nToDelete] = i;
808                                 nToDelete++;
809                                 /* replacing it with redirect will save no space */
810                         }
811                         else
812                                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
813
814                         i = it->nextOffset;
815                 }
816         }
817         in.nTuples = nToInsert;
818
819         /*
820          * We may not actually insert new tuple because another picksplit may be
821          * necessary due to too large value, but we will try to allocate enough
822          * space to include it; and in any case it has to be included in the input
823          * for the picksplit function.  So don't increment nToInsert yet.
824          */
825         in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
826         heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
827         in.nTuples++;
828
829         memset(&out, 0, sizeof(out));
830
831         if (!isNulls)
832         {
833                 /*
834                  * Perform split using user-defined method.
835                  */
836                 procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
837                 FunctionCall2Coll(procinfo,
838                                                   index->rd_indcollation[0],
839                                                   PointerGetDatum(&in),
840                                                   PointerGetDatum(&out));
841
842                 /*
843                  * Form new leaf tuples and count up the total space needed.
844                  */
845                 totalLeafSizes = 0;
846                 for (i = 0; i < in.nTuples; i++)
847                 {
848                         newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
849                                                                                    out.leafTupleDatums[i],
850                                                                                    false);
851                         totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
852                 }
853         }
854         else
855         {
856                 /*
857                  * Perform dummy split that puts all tuples into one node.
858                  * checkAllTheSame will override this and force allTheSame mode.
859                  */
860                 out.hasPrefix = false;
861                 out.nNodes = 1;
862                 out.nodeLabels = NULL;
863                 out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
864
865                 /*
866                  * Form new leaf tuples and count up the total space needed.
867                  */
868                 totalLeafSizes = 0;
869                 for (i = 0; i < in.nTuples; i++)
870                 {
871                         newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
872                                                                                    (Datum) 0,
873                                                                                    true);
874                         totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
875                 }
876         }
877
878         /*
879          * Check to see if the picksplit function failed to separate the values,
880          * ie, it put them all into the same child node.  If so, select allTheSame
881          * mode and create a random split instead.      See comments for
882          * checkAllTheSame as to why we need to know if the new leaf tuples could
883          * fit on one page.
884          */
885         allTheSame = checkAllTheSame(&in, &out,
886                                                                  totalLeafSizes > SPGIST_PAGE_CAPACITY,
887                                                                  &includeNew);
888
889         /*
890          * If checkAllTheSame decided we must exclude the new tuple, don't
891          * consider it any further.
892          */
893         if (includeNew)
894                 maxToInclude = in.nTuples;
895         else
896         {
897                 maxToInclude = in.nTuples - 1;
898                 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
899         }
900
901         /*
902          * Allocate per-node work arrays.  Since checkAllTheSame could replace
903          * out.nNodes with a value larger than the number of tuples on the input
904          * page, we can't allocate these arrays before here.
905          */
906         nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
907         leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
908
909         /*
910          * Form nodes of inner tuple and inner tuple itself
911          */
912         for (i = 0; i < out.nNodes; i++)
913         {
914                 Datum           label = (Datum) 0;
915                 bool            labelisnull = (out.nodeLabels == NULL);
916
917                 if (!labelisnull)
918                         label = out.nodeLabels[i];
919                 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
920         }
921         innerTuple = spgFormInnerTuple(state,
922                                                                    out.hasPrefix, out.prefixDatum,
923                                                                    out.nNodes, nodes);
924         innerTuple->allTheSame = allTheSame;
925
926         /*
927          * Update nodes[] array to point into the newly formed innerTuple, so that
928          * we can adjust their downlinks below.
929          */
930         SGITITERATE(innerTuple, i, node)
931         {
932                 nodes[i] = node;
933         }
934
935         /*
936          * Re-scan new leaf tuples and count up the space needed under each node.
937          */
938         for (i = 0; i < maxToInclude; i++)
939         {
940                 n = out.mapTuplesToNodes[i];
941                 if (n < 0 || n >= out.nNodes)
942                         elog(ERROR, "inconsistent result of SPGiST picksplit function");
943                 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
944         }
945
946         /*
947          * To perform the split, we must insert a new inner tuple, which can't go
948          * on a leaf page; and unless we are splitting the root page, we must then
949          * update the parent tuple's downlink to point to the inner tuple.  If
950          * there is room, we'll put the new inner tuple on the same page as the
951          * parent tuple, otherwise we need another non-leaf buffer. But if the
952          * parent page is the root, we can't add the new inner tuple there,
953          * because the root page must have only one inner tuple.
954          */
955         xlrec.initInner = false;
956         if (parent->buffer != InvalidBuffer &&
957                 !SpGistBlockIsRoot(parent->blkno) &&
958                 (SpGistPageGetFreeSpace(parent->page, 1) >=
959                  innerTuple->size + sizeof(ItemIdData)))
960         {
961                 /* New inner tuple will fit on parent page */
962                 newInnerBuffer = parent->buffer;
963         }
964         else if (parent->buffer != InvalidBuffer)
965         {
966                 /* Send tuple to page with next triple parity (see README) */
967                 newInnerBuffer = SpGistGetBuffer(index,
968                                                                            GBUF_INNER_PARITY(parent->blkno + 1) |
969                                                                                  (isNulls ? GBUF_NULLS : 0),
970                                                                            innerTuple->size + sizeof(ItemIdData),
971                                                                                  &xlrec.initInner);
972         }
973         else
974         {
975                 /* Root page split ... inner tuple will go to root page */
976                 newInnerBuffer = InvalidBuffer;
977         }
978
979         /*
980          * Because a WAL record can't involve more than four buffers, we can only
981          * afford to deal with two leaf pages in each picksplit action, ie the
982          * current page and at most one other.
983          *
984          * The new leaf tuples converted from the existing ones should require the
985          * same or less space, and therefore should all fit onto one page
986          * (although that's not necessarily the current page, since we can't
987          * delete the old tuples but only replace them with placeholders).
988          * However, the incoming new tuple might not also fit, in which case we
989          * might need another picksplit cycle to reduce it some more.
990          *
991          * If there's not room to put everything back onto the current page, then
992          * we decide on a per-node basis which tuples go to the new page. (We do
993          * it like that because leaf tuple chains can't cross pages, so we must
994          * place all leaf tuples belonging to the same parent node on the same
995          * page.)
996          *
997          * If we are splitting the root page (turning it from a leaf page into an
998          * inner page), then no leaf tuples can go back to the current page; they
999          * must all go somewhere else.
1000          */
1001         if (!SpGistBlockIsRoot(current->blkno))
1002                 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1003         else
1004                 currentFreeSpace = 0;   /* prevent assigning any tuples to current */
1005
1006         xlrec.initDest = false;
1007
1008         if (totalLeafSizes <= currentFreeSpace)
1009         {
1010                 /* All the leaf tuples will fit on current page */
1011                 newLeafBuffer = InvalidBuffer;
1012                 /* mark new leaf tuple as included in insertions, if allowed */
1013                 if (includeNew)
1014                 {
1015                         nToInsert++;
1016                         insertedNew = true;
1017                 }
1018                 for (i = 0; i < nToInsert; i++)
1019                         leafPageSelect[i] = 0;          /* signifies current page */
1020         }
1021         else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1022         {
1023                 /*
1024                  * We're trying to split up a long value by repeated suffixing, but
1025                  * it's not going to fit yet.  Don't bother allocating a second leaf
1026                  * buffer that we won't be able to use.
1027                  */
1028                 newLeafBuffer = InvalidBuffer;
1029                 Assert(includeNew);
1030                 Assert(nToInsert == 0);
1031         }
1032         else
1033         {
1034                 /* We will need another leaf page */
1035                 uint8      *nodePageSelect;
1036                 int                     curspace;
1037                 int                     newspace;
1038
1039                 newLeafBuffer = SpGistGetBuffer(index,
1040                                                                           GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1041                                                                                 Min(totalLeafSizes,
1042                                                                                         SPGIST_PAGE_CAPACITY),
1043                                                                                 &xlrec.initDest);
1044
1045                 /*
1046                  * Attempt to assign node groups to the two pages.      We might fail to
1047                  * do so, even if totalLeafSizes is less than the available space,
1048                  * because we can't split a group across pages.
1049                  */
1050                 nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1051
1052                 curspace = currentFreeSpace;
1053                 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1054                 for (i = 0; i < out.nNodes; i++)
1055                 {
1056                         if (leafSizes[i] <= curspace)
1057                         {
1058                                 nodePageSelect[i] = 0;  /* signifies current page */
1059                                 curspace -= leafSizes[i];
1060                         }
1061                         else
1062                         {
1063                                 nodePageSelect[i] = 1;  /* signifies new leaf page */
1064                                 newspace -= leafSizes[i];
1065                         }
1066                 }
1067                 if (curspace >= 0 && newspace >= 0)
1068                 {
1069                         /* Successful assignment, so we can include the new leaf tuple */
1070                         if (includeNew)
1071                         {
1072                                 nToInsert++;
1073                                 insertedNew = true;
1074                         }
1075                 }
1076                 else if (includeNew)
1077                 {
1078                         /* We must exclude the new leaf tuple from the split */
1079                         int                     nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1080
1081                         leafSizes[nodeOfNewTuple] -=
1082                                 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1083
1084                         /* Repeat the node assignment process --- should succeed now */
1085                         curspace = currentFreeSpace;
1086                         newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1087                         for (i = 0; i < out.nNodes; i++)
1088                         {
1089                                 if (leafSizes[i] <= curspace)
1090                                 {
1091                                         nodePageSelect[i] = 0;          /* signifies current page */
1092                                         curspace -= leafSizes[i];
1093                                 }
1094                                 else
1095                                 {
1096                                         nodePageSelect[i] = 1;          /* signifies new leaf page */
1097                                         newspace -= leafSizes[i];
1098                                 }
1099                         }
1100                         if (curspace < 0 || newspace < 0)
1101                                 elog(ERROR, "failed to divide leaf tuple groups across pages");
1102                 }
1103                 else
1104                 {
1105                         /* oops, we already excluded new tuple ... should not get here */
1106                         elog(ERROR, "failed to divide leaf tuple groups across pages");
1107                 }
1108                 /* Expand the per-node assignments to be shown per leaf tuple */
1109                 for (i = 0; i < nToInsert; i++)
1110                 {
1111                         n = out.mapTuplesToNodes[i];
1112                         leafPageSelect[i] = nodePageSelect[n];
1113                 }
1114         }
1115
1116         /* Start preparing WAL record */
1117         xlrec.blknoSrc = current->blkno;
1118         xlrec.blknoDest = InvalidBlockNumber;
1119         xlrec.nDelete = 0;
1120         xlrec.initSrc = isNew;
1121         xlrec.storesNulls = isNulls;
1122
1123         leafdata = leafptr = (char *) palloc(totalLeafSizes);
1124
1125         ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
1126         ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, 1);
1127         nRdata = 2;
1128
1129         /* Here we begin making the changes to the target pages */
1130         START_CRIT_SECTION();
1131
1132         /*
1133          * Delete old leaf tuples from current buffer, except when we're splitting
1134          * the root; in that case there's no need because we'll re-init the page
1135          * below.  We do this first to make room for reinserting new leaf tuples.
1136          */
1137         if (!SpGistBlockIsRoot(current->blkno))
1138         {
1139                 /*
1140                  * Init buffer instead of deleting individual tuples, but only if
1141                  * there aren't any other live tuples and only during build; otherwise
1142                  * we need to set a redirection tuple for concurrent scans.
1143                  */
1144                 if (state->isBuild &&
1145                         nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1146                         PageGetMaxOffsetNumber(current->page))
1147                 {
1148                         SpGistInitBuffer(current->buffer,
1149                                                          SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1150                         xlrec.initSrc = true;
1151                 }
1152                 else if (isNew)
1153                 {
1154                         /* don't expose the freshly init'd buffer as a backup block */
1155                         Assert(nToDelete == 0);
1156                 }
1157                 else
1158                 {
1159                         xlrec.nDelete = nToDelete;
1160                         ACCEPT_RDATA_DATA(toDelete,
1161                                                           MAXALIGN(sizeof(OffsetNumber) * nToDelete),
1162                                                           nRdata);
1163                         nRdata++;
1164                         ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
1165                         nRdata++;
1166
1167                         if (!state->isBuild)
1168                         {
1169                                 /*
1170                                  * Need to create redirect tuple (it will point to new inner
1171                                  * tuple) but right now the new tuple's location is not known
1172                                  * yet.  So, set the redirection pointer to "impossible" value
1173                                  * and remember its position to update tuple later.
1174                                  */
1175                                 if (nToDelete > 0)
1176                                         redirectTuplePos = toDelete[0];
1177                                 spgPageIndexMultiDelete(state, current->page,
1178                                                                                 toDelete, nToDelete,
1179                                                                                 SPGIST_REDIRECT,
1180                                                                                 SPGIST_PLACEHOLDER,
1181                                                                                 SPGIST_METAPAGE_BLKNO,
1182                                                                                 FirstOffsetNumber);
1183                         }
1184                         else
1185                         {
1186                                 /*
1187                                  * During index build there is not concurrent searches, so we
1188                                  * don't need to create redirection tuple.
1189                                  */
1190                                 spgPageIndexMultiDelete(state, current->page,
1191                                                                                 toDelete, nToDelete,
1192                                                                                 SPGIST_PLACEHOLDER,
1193                                                                                 SPGIST_PLACEHOLDER,
1194                                                                                 InvalidBlockNumber,
1195                                                                                 InvalidOffsetNumber);
1196                         }
1197                 }
1198         }
1199
1200         /*
1201          * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1202          * nodes.
1203          */
1204         startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1205         for (i = 0; i < nToInsert; i++)
1206         {
1207                 SpGistLeafTuple it = newLeafs[i];
1208                 Buffer          leafBuffer;
1209                 BlockNumber leafBlock;
1210                 OffsetNumber newoffset;
1211
1212                 /* Which page is it going to? */
1213                 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1214                 leafBlock = BufferGetBlockNumber(leafBuffer);
1215
1216                 /* Link tuple into correct chain for its node */
1217                 n = out.mapTuplesToNodes[i];
1218
1219                 if (ItemPointerIsValid(&nodes[n]->t_tid))
1220                 {
1221                         Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1222                         it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1223                 }
1224                 else
1225                         it->nextOffset = InvalidOffsetNumber;
1226
1227                 /* Insert it on page */
1228                 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1229                                                                                  (Item) it, it->size,
1230                                                                                  &startOffsets[leafPageSelect[i]],
1231                                                                                  false);
1232                 toInsert[i] = newoffset;
1233
1234                 /* ... and complete the chain linking */
1235                 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1236
1237                 /* Also copy leaf tuple into WAL data */
1238                 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1239                 leafptr += newLeafs[i]->size;
1240         }
1241
1242         /*
1243          * We're done modifying the other leaf buffer (if any), so mark it dirty.
1244          * current->buffer will be marked below, after we're entirely done
1245          * modifying it.
1246          */
1247         if (newLeafBuffer != InvalidBuffer)
1248         {
1249                 MarkBufferDirty(newLeafBuffer);
1250                 /* also save block number for WAL */
1251                 xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer);
1252                 if (!xlrec.initDest)
1253                 {
1254                         ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata);
1255                         nRdata++;
1256                 }
1257         }
1258
1259         xlrec.nInsert = nToInsert;
1260         ACCEPT_RDATA_DATA(toInsert,
1261                                           MAXALIGN(sizeof(OffsetNumber) * nToInsert),
1262                                           nRdata);
1263         nRdata++;
1264         ACCEPT_RDATA_DATA(leafPageSelect,
1265                                           MAXALIGN(sizeof(uint8) * nToInsert),
1266                                           nRdata);
1267         nRdata++;
1268         ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata);
1269         nRdata++;
1270
1271         /* Remember current buffer, since we're about to change "current" */
1272         saveCurrent = *current;
1273
1274         /*
1275          * Store the new innerTuple
1276          */
1277         if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1278         {
1279                 /*
1280                  * new inner tuple goes to parent page
1281                  */
1282                 Assert(current->buffer != parent->buffer);
1283
1284                 /* Repoint "current" at the new inner tuple */
1285                 current->blkno = parent->blkno;
1286                 current->buffer = parent->buffer;
1287                 current->page = parent->page;
1288                 xlrec.blknoInner = current->blkno;
1289                 xlrec.offnumInner = current->offnum =
1290                         SpGistPageAddNewItem(state, current->page,
1291                                                                  (Item) innerTuple, innerTuple->size,
1292                                                                  NULL, false);
1293
1294                 /*
1295                  * Update parent node link and mark parent page dirty
1296                  */
1297                 xlrec.blknoParent = parent->blkno;
1298                 xlrec.offnumParent = parent->offnum;
1299                 xlrec.nodeI = parent->node;
1300                 saveNodeLink(index, parent, current->blkno, current->offnum);
1301
1302                 ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
1303                 nRdata++;
1304
1305                 /*
1306                  * Update redirection link (in old current buffer)
1307                  */
1308                 if (redirectTuplePos != InvalidOffsetNumber)
1309                         setRedirectionTuple(&saveCurrent, redirectTuplePos,
1310                                                                 current->blkno, current->offnum);
1311
1312                 /* Done modifying old current buffer, mark it dirty */
1313                 MarkBufferDirty(saveCurrent.buffer);
1314         }
1315         else if (parent->buffer != InvalidBuffer)
1316         {
1317                 /*
1318                  * new inner tuple will be stored on a new page
1319                  */
1320                 Assert(newInnerBuffer != InvalidBuffer);
1321
1322                 /* Repoint "current" at the new inner tuple */
1323                 current->buffer = newInnerBuffer;
1324                 current->blkno = BufferGetBlockNumber(current->buffer);
1325                 current->page = BufferGetPage(current->buffer);
1326                 xlrec.blknoInner = current->blkno;
1327                 xlrec.offnumInner = current->offnum =
1328                         SpGistPageAddNewItem(state, current->page,
1329                                                                  (Item) innerTuple, innerTuple->size,
1330                                                                  NULL, false);
1331
1332                 /* Done modifying new current buffer, mark it dirty */
1333                 MarkBufferDirty(current->buffer);
1334
1335                 /*
1336                  * Update parent node link and mark parent page dirty
1337                  */
1338                 xlrec.blknoParent = parent->blkno;
1339                 xlrec.offnumParent = parent->offnum;
1340                 xlrec.nodeI = parent->node;
1341                 saveNodeLink(index, parent, current->blkno, current->offnum);
1342
1343                 ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
1344                 nRdata++;
1345                 ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
1346                 nRdata++;
1347
1348                 /*
1349                  * Update redirection link (in old current buffer)
1350                  */
1351                 if (redirectTuplePos != InvalidOffsetNumber)
1352                         setRedirectionTuple(&saveCurrent, redirectTuplePos,
1353                                                                 current->blkno, current->offnum);
1354
1355                 /* Done modifying old current buffer, mark it dirty */
1356                 MarkBufferDirty(saveCurrent.buffer);
1357         }
1358         else
1359         {
1360                 /*
1361                  * Splitting root page, which was a leaf but now becomes inner page
1362                  * (and so "current" continues to point at it)
1363                  */
1364                 Assert(SpGistBlockIsRoot(current->blkno));
1365                 Assert(redirectTuplePos == InvalidOffsetNumber);
1366
1367                 SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1368                 xlrec.initInner = true;
1369
1370                 xlrec.blknoInner = current->blkno;
1371                 xlrec.offnumInner = current->offnum =
1372                         PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1373                                                 InvalidOffsetNumber, false, false);
1374                 if (current->offnum != FirstOffsetNumber)
1375                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
1376                                  innerTuple->size);
1377
1378                 /* No parent link to update, nor redirection to do */
1379                 xlrec.blknoParent = InvalidBlockNumber;
1380                 xlrec.offnumParent = InvalidOffsetNumber;
1381                 xlrec.nodeI = 0;
1382
1383                 /* Done modifying new current buffer, mark it dirty */
1384                 MarkBufferDirty(current->buffer);
1385
1386                 /* saveCurrent doesn't represent a different buffer */
1387                 saveCurrent.buffer = InvalidBuffer;
1388         }
1389
1390         if (RelationNeedsWAL(index))
1391         {
1392                 XLogRecPtr      recptr;
1393
1394                 /* Issue the WAL record */
1395                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata);
1396
1397                 /* Update page LSNs on all affected pages */
1398                 if (newLeafBuffer != InvalidBuffer)
1399                 {
1400                         Page            page = BufferGetPage(newLeafBuffer);
1401
1402                         PageSetLSN(page, recptr);
1403                         PageSetTLI(page, ThisTimeLineID);
1404                 }
1405
1406                 if (saveCurrent.buffer != InvalidBuffer)
1407                 {
1408                         Page            page = BufferGetPage(saveCurrent.buffer);
1409
1410                         PageSetLSN(page, recptr);
1411                         PageSetTLI(page, ThisTimeLineID);
1412                 }
1413
1414                 PageSetLSN(current->page, recptr);
1415                 PageSetTLI(current->page, ThisTimeLineID);
1416
1417                 if (parent->buffer != InvalidBuffer)
1418                 {
1419                         PageSetLSN(parent->page, recptr);
1420                         PageSetTLI(parent->page, ThisTimeLineID);
1421                 }
1422         }
1423
1424         END_CRIT_SECTION();
1425
1426         /* Update local free-space cache and unlock buffers */
1427         if (newLeafBuffer != InvalidBuffer)
1428         {
1429                 SpGistSetLastUsedPage(index, newLeafBuffer);
1430                 UnlockReleaseBuffer(newLeafBuffer);
1431         }
1432         if (saveCurrent.buffer != InvalidBuffer)
1433         {
1434                 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1435                 UnlockReleaseBuffer(saveCurrent.buffer);
1436         }
1437
1438         return insertedNew;
1439 }
1440
1441 /*
1442  * spgMatchNode action: descend to N'th child node of current inner tuple
1443  */
1444 static void
1445 spgMatchNodeAction(Relation index, SpGistState *state,
1446                                    SpGistInnerTuple innerTuple,
1447                                    SPPageDesc *current, SPPageDesc *parent, int nodeN)
1448 {
1449         int                     i;
1450         SpGistNodeTuple node;
1451
1452         /* Release previous parent buffer if any */
1453         if (parent->buffer != InvalidBuffer &&
1454                 parent->buffer != current->buffer)
1455         {
1456                 SpGistSetLastUsedPage(index, parent->buffer);
1457                 UnlockReleaseBuffer(parent->buffer);
1458         }
1459
1460         /* Repoint parent to specified node of current inner tuple */
1461         parent->blkno = current->blkno;
1462         parent->buffer = current->buffer;
1463         parent->page = current->page;
1464         parent->offnum = current->offnum;
1465         parent->node = nodeN;
1466
1467         /* Locate that node */
1468         SGITITERATE(innerTuple, i, node)
1469         {
1470                 if (i == nodeN)
1471                         break;
1472         }
1473
1474         if (i != nodeN)
1475                 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1476                          nodeN);
1477
1478         /* Point current to the downlink location, if any */
1479         if (ItemPointerIsValid(&node->t_tid))
1480         {
1481                 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1482                 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1483         }
1484         else
1485         {
1486                 /* Downlink is empty, so we'll need to find a new page */
1487                 current->blkno = InvalidBlockNumber;
1488                 current->offnum = InvalidOffsetNumber;
1489         }
1490
1491         current->buffer = InvalidBuffer;
1492         current->page = NULL;
1493 }
1494
1495 /*
1496  * spgAddNode action: add a node to the inner tuple at current
1497  */
1498 static void
1499 spgAddNodeAction(Relation index, SpGistState *state,
1500                                  SpGistInnerTuple innerTuple,
1501                                  SPPageDesc *current, SPPageDesc *parent,
1502                                  int nodeN, Datum nodeLabel)
1503 {
1504         SpGistInnerTuple newInnerTuple;
1505         XLogRecData rdata[5];
1506         spgxlogAddNode xlrec;
1507
1508         /* Should not be applied to nulls */
1509         Assert(!SpGistPageStoresNulls(current->page));
1510
1511         /* Construct new inner tuple with additional node */
1512         newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1513
1514         /* Prepare WAL record */
1515         xlrec.node = index->rd_node;
1516         STORE_STATE(state, xlrec.stateSrc);
1517         xlrec.blkno = current->blkno;
1518         xlrec.offnum = current->offnum;
1519
1520         /* we don't fill these unless we need to change the parent downlink */
1521         xlrec.blknoParent = InvalidBlockNumber;
1522         xlrec.offnumParent = InvalidOffsetNumber;
1523         xlrec.nodeI = 0;
1524
1525         /* we don't fill these unless tuple has to be moved */
1526         xlrec.blknoNew = InvalidBlockNumber;
1527         xlrec.offnumNew = InvalidOffsetNumber;
1528         xlrec.newPage = false;
1529
1530         ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
1531         /* we assume sizeof(xlrec) is at least int-aligned */
1532         ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1);
1533         ACCEPT_RDATA_BUFFER(current->buffer, 2);
1534
1535         if (PageGetExactFreeSpace(current->page) >=
1536                 newInnerTuple->size - innerTuple->size)
1537         {
1538                 /*
1539                  * We can replace the inner tuple by new version in-place
1540                  */
1541                 START_CRIT_SECTION();
1542
1543                 PageIndexTupleDelete(current->page, current->offnum);
1544                 if (PageAddItem(current->page,
1545                                                 (Item) newInnerTuple, newInnerTuple->size,
1546                                                 current->offnum, false, false) != current->offnum)
1547                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
1548                                  newInnerTuple->size);
1549
1550                 MarkBufferDirty(current->buffer);
1551
1552                 if (RelationNeedsWAL(index))
1553                 {
1554                         XLogRecPtr      recptr;
1555
1556                         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
1557
1558                         PageSetLSN(current->page, recptr);
1559                         PageSetTLI(current->page, ThisTimeLineID);
1560                 }
1561
1562                 END_CRIT_SECTION();
1563         }
1564         else
1565         {
1566                 /*
1567                  * move inner tuple to another page, and update parent
1568                  */
1569                 SpGistDeadTuple dt;
1570                 SPPageDesc      saveCurrent;
1571
1572                 /*
1573                  * It should not be possible to get here for the root page, since we
1574                  * allow only one inner tuple on the root page, and spgFormInnerTuple
1575                  * always checks that inner tuples don't exceed the size of a page.
1576                  */
1577                 if (SpGistBlockIsRoot(current->blkno))
1578                         elog(ERROR, "cannot enlarge root tuple any more");
1579                 Assert(parent->buffer != InvalidBuffer);
1580
1581                 saveCurrent = *current;
1582
1583                 xlrec.blknoParent = parent->blkno;
1584                 xlrec.offnumParent = parent->offnum;
1585                 xlrec.nodeI = parent->node;
1586
1587                 /*
1588                  * obtain new buffer with the same parity as current, since it will be
1589                  * a child of same parent tuple
1590                  */
1591                 current->buffer = SpGistGetBuffer(index,
1592                                                                                   GBUF_INNER_PARITY(current->blkno),
1593                                                                         newInnerTuple->size + sizeof(ItemIdData),
1594                                                                                   &xlrec.newPage);
1595                 current->blkno = BufferGetBlockNumber(current->buffer);
1596                 current->page = BufferGetPage(current->buffer);
1597
1598                 xlrec.blknoNew = current->blkno;
1599
1600                 /*
1601                  * Let's just make real sure new current isn't same as old.  Right now
1602                  * that's impossible, but if SpGistGetBuffer ever got smart enough to
1603                  * delete placeholder tuples before checking space, maybe it wouldn't
1604                  * be impossible.  The case would appear to work except that WAL
1605                  * replay would be subtly wrong, so I think a mere assert isn't enough
1606                  * here.
1607                  */
1608                 if (xlrec.blknoNew == xlrec.blkno)
1609                         elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1610
1611                 /*
1612                  * New current and parent buffer will both be modified; but note that
1613                  * parent buffer could be same as either new or old current.
1614                  */
1615                 ACCEPT_RDATA_BUFFER(current->buffer, 3);
1616                 if (parent->buffer != current->buffer &&
1617                         parent->buffer != saveCurrent.buffer)
1618                         ACCEPT_RDATA_BUFFER(parent->buffer, 4);
1619
1620                 START_CRIT_SECTION();
1621
1622                 /* insert new ... */
1623                 xlrec.offnumNew = current->offnum =
1624                         SpGistPageAddNewItem(state, current->page,
1625                                                                  (Item) newInnerTuple, newInnerTuple->size,
1626                                                                  NULL, false);
1627
1628                 MarkBufferDirty(current->buffer);
1629
1630                 /* update parent's downlink and mark parent page dirty */
1631                 saveNodeLink(index, parent, current->blkno, current->offnum);
1632
1633                 /*
1634                  * Replace old tuple with a placeholder or redirection tuple.  Unless
1635                  * doing an index build, we have to insert a redirection tuple for
1636                  * possible concurrent scans.  We can't just delete it in any case,
1637                  * because that could change the offsets of other tuples on the page,
1638                  * breaking downlinks from their parents.
1639                  */
1640                 if (state->isBuild)
1641                         dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1642                                                                   InvalidBlockNumber, InvalidOffsetNumber);
1643                 else
1644                         dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1645                                                                   current->blkno, current->offnum);
1646
1647                 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1648                 if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1649                                                 saveCurrent.offnum,
1650                                                 false, false) != saveCurrent.offnum)
1651                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
1652                                  dt->size);
1653
1654                 if (state->isBuild)
1655                         SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1656                 else
1657                         SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1658
1659                 MarkBufferDirty(saveCurrent.buffer);
1660
1661                 if (RelationNeedsWAL(index))
1662                 {
1663                         XLogRecPtr      recptr;
1664
1665                         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
1666
1667                         /* we don't bother to check if any of these are redundant */
1668                         PageSetLSN(current->page, recptr);
1669                         PageSetTLI(current->page, ThisTimeLineID);
1670                         PageSetLSN(parent->page, recptr);
1671                         PageSetTLI(parent->page, ThisTimeLineID);
1672                         PageSetLSN(saveCurrent.page, recptr);
1673                         PageSetTLI(saveCurrent.page, ThisTimeLineID);
1674                 }
1675
1676                 END_CRIT_SECTION();
1677
1678                 /* Release saveCurrent if it's not same as current or parent */
1679                 if (saveCurrent.buffer != current->buffer &&
1680                         saveCurrent.buffer != parent->buffer)
1681                 {
1682                         SpGistSetLastUsedPage(index, saveCurrent.buffer);
1683                         UnlockReleaseBuffer(saveCurrent.buffer);
1684                 }
1685         }
1686 }
1687
1688 /*
1689  * spgSplitNode action: split inner tuple at current into prefix and postfix
1690  */
1691 static void
1692 spgSplitNodeAction(Relation index, SpGistState *state,
1693                                    SpGistInnerTuple innerTuple,
1694                                    SPPageDesc *current, spgChooseOut *out)
1695 {
1696         SpGistInnerTuple prefixTuple,
1697                                 postfixTuple;
1698         SpGistNodeTuple node,
1699                            *nodes;
1700         BlockNumber postfixBlkno;
1701         OffsetNumber postfixOffset;
1702         int                     i;
1703         XLogRecData rdata[5];
1704         spgxlogSplitTuple xlrec;
1705         Buffer          newBuffer = InvalidBuffer;
1706
1707         /* Should not be applied to nulls */
1708         Assert(!SpGistPageStoresNulls(current->page));
1709
1710         /*
1711          * Construct new prefix tuple, containing a single node with the specified
1712          * label.  (We'll update the node's downlink to point to the new postfix
1713          * tuple, below.)
1714          */
1715         node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false);
1716
1717         prefixTuple = spgFormInnerTuple(state,
1718                                                                         out->result.splitTuple.prefixHasPrefix,
1719                                                                         out->result.splitTuple.prefixPrefixDatum,
1720                                                                         1, &node);
1721
1722         /* it must fit in the space that innerTuple now occupies */
1723         if (prefixTuple->size > innerTuple->size)
1724                 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1725
1726         /*
1727          * Construct new postfix tuple, containing all nodes of innerTuple with
1728          * same node datums, but with the prefix specified by the picksplit
1729          * function.
1730          */
1731         nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1732         SGITITERATE(innerTuple, i, node)
1733         {
1734                 nodes[i] = node;
1735         }
1736
1737         postfixTuple = spgFormInnerTuple(state,
1738                                                                          out->result.splitTuple.postfixHasPrefix,
1739                                                                    out->result.splitTuple.postfixPrefixDatum,
1740                                                                          innerTuple->nNodes, nodes);
1741
1742         /* Postfix tuple is allTheSame if original tuple was */
1743         postfixTuple->allTheSame = innerTuple->allTheSame;
1744
1745         /* prep data for WAL record */
1746         xlrec.node = index->rd_node;
1747         xlrec.newPage = false;
1748
1749         ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
1750         /* we assume sizeof(xlrec) is at least int-aligned */
1751         ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1);
1752         ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2);
1753         ACCEPT_RDATA_BUFFER(current->buffer, 3);
1754
1755         /*
1756          * If we can't fit both tuples on the current page, get a new page for the
1757          * postfix tuple.  In particular, can't split to the root page.
1758          *
1759          * For the space calculation, note that prefixTuple replaces innerTuple
1760          * but postfixTuple will be a new entry.
1761          */
1762         if (SpGistBlockIsRoot(current->blkno) ||
1763                 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1764                 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1765         {
1766                 /*
1767                  * Choose page with next triple parity, because postfix tuple is a
1768                  * child of prefix one
1769                  */
1770                 newBuffer = SpGistGetBuffer(index,
1771                                                                         GBUF_INNER_PARITY(current->blkno + 1),
1772                                                                         postfixTuple->size + sizeof(ItemIdData),
1773                                                                         &xlrec.newPage);
1774                 ACCEPT_RDATA_BUFFER(newBuffer, 4);
1775         }
1776
1777         START_CRIT_SECTION();
1778
1779         /*
1780          * Replace old tuple by prefix tuple
1781          */
1782         PageIndexTupleDelete(current->page, current->offnum);
1783         xlrec.offnumPrefix = PageAddItem(current->page,
1784                                                                          (Item) prefixTuple, prefixTuple->size,
1785                                                                          current->offnum, false, false);
1786         if (xlrec.offnumPrefix != current->offnum)
1787                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1788                          prefixTuple->size);
1789         xlrec.blknoPrefix = current->blkno;
1790
1791         /*
1792          * put postfix tuple into appropriate page
1793          */
1794         if (newBuffer == InvalidBuffer)
1795         {
1796                 xlrec.blknoPostfix = postfixBlkno = current->blkno;
1797                 xlrec.offnumPostfix = postfixOffset =
1798                         SpGistPageAddNewItem(state, current->page,
1799                                                                  (Item) postfixTuple, postfixTuple->size,
1800                                                                  NULL, false);
1801         }
1802         else
1803         {
1804                 xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer);
1805                 xlrec.offnumPostfix = postfixOffset =
1806                         SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1807                                                                  (Item) postfixTuple, postfixTuple->size,
1808                                                                  NULL, false);
1809                 MarkBufferDirty(newBuffer);
1810         }
1811
1812         /*
1813          * And set downlink pointer in the prefix tuple to point to postfix tuple.
1814          * (We can't avoid this step by doing the above two steps in opposite
1815          * order, because there might not be enough space on the page to insert
1816          * the postfix tuple first.)  We have to update the local copy of the
1817          * prefixTuple too, because that's what will be written to WAL.
1818          */
1819         spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1820         prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1821                                                           PageGetItemId(current->page, current->offnum));
1822         spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
1823
1824         MarkBufferDirty(current->buffer);
1825
1826         if (RelationNeedsWAL(index))
1827         {
1828                 XLogRecPtr      recptr;
1829
1830                 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
1831
1832                 PageSetLSN(current->page, recptr);
1833                 PageSetTLI(current->page, ThisTimeLineID);
1834
1835                 if (newBuffer != InvalidBuffer)
1836                 {
1837                         PageSetLSN(BufferGetPage(newBuffer), recptr);
1838                         PageSetTLI(BufferGetPage(newBuffer), ThisTimeLineID);
1839                 }
1840         }
1841
1842         END_CRIT_SECTION();
1843
1844         /* Update local free-space cache and release buffer */
1845         if (newBuffer != InvalidBuffer)
1846         {
1847                 SpGistSetLastUsedPage(index, newBuffer);
1848                 UnlockReleaseBuffer(newBuffer);
1849         }
1850 }
1851
1852 /*
1853  * Insert one item into the index
1854  */
1855 void
1856 spgdoinsert(Relation index, SpGistState *state,
1857                         ItemPointer heapPtr, Datum datum, bool isnull)
1858 {
1859         int                     level = 0;
1860         Datum           leafDatum;
1861         int                     leafSize;
1862         SPPageDesc      current,
1863                                 parent;
1864         FmgrInfo   *procinfo = NULL;
1865
1866         /*
1867          * Look up FmgrInfo of the user-defined choose function once, to save
1868          * cycles in the loop below.
1869          */
1870         if (!isnull)
1871                 procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1872
1873         /*
1874          * Since we don't use index_form_tuple in this AM, we have to make sure
1875          * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
1876          * that.
1877          */
1878         if (!isnull && state->attType.attlen == -1)
1879                 datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1880
1881         leafDatum = datum;
1882
1883         /*
1884          * Compute space needed for a leaf tuple containing the given datum.
1885          *
1886          * If it isn't gonna fit, and the opclass can't reduce the datum size by
1887          * suffixing, bail out now rather than getting into an endless loop.
1888          */
1889         if (!isnull)
1890                 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1891                         SpGistGetTypeSize(&state->attType, leafDatum);
1892         else
1893                 leafSize = SGDTSIZE + sizeof(ItemIdData);
1894
1895         if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
1896                 ereport(ERROR,
1897                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1898                         errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
1899                                    (unsigned long) (leafSize - sizeof(ItemIdData)),
1900                                  (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
1901                                    RelationGetRelationName(index)),
1902                         errhint("Values larger than a buffer page cannot be indexed.")));
1903
1904         /* Initialize "current" to the appropriate root page */
1905         current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
1906         current.buffer = InvalidBuffer;
1907         current.page = NULL;
1908         current.offnum = FirstOffsetNumber;
1909         current.node = -1;
1910
1911         /* "parent" is invalid for the moment */
1912         parent.blkno = InvalidBlockNumber;
1913         parent.buffer = InvalidBuffer;
1914         parent.page = NULL;
1915         parent.offnum = InvalidOffsetNumber;
1916         parent.node = -1;
1917
1918         for (;;)
1919         {
1920                 bool            isNew = false;
1921
1922                 /*
1923                  * Bail out if query cancel is pending.  We must have this somewhere
1924                  * in the loop since a broken opclass could produce an infinite
1925                  * picksplit loop.
1926                  */
1927                 CHECK_FOR_INTERRUPTS();
1928
1929                 if (current.blkno == InvalidBlockNumber)
1930                 {
1931                         /*
1932                          * Create a leaf page.  If leafSize is too large to fit on a page,
1933                          * we won't actually use the page yet, but it simplifies the API
1934                          * for doPickSplit to always have a leaf page at hand; so just
1935                          * quietly limit our request to a page size.
1936                          */
1937                         current.buffer =
1938                                 SpGistGetBuffer(index,
1939                                                                 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
1940                                                                 Min(leafSize, SPGIST_PAGE_CAPACITY),
1941                                                                 &isNew);
1942                         current.blkno = BufferGetBlockNumber(current.buffer);
1943                 }
1944                 else if (parent.buffer == InvalidBuffer ||
1945                                  current.blkno != parent.blkno)
1946                 {
1947                         current.buffer = ReadBuffer(index, current.blkno);
1948                         LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
1949                 }
1950                 else
1951                 {
1952                         /* inner tuple can be stored on the same page as parent one */
1953                         current.buffer = parent.buffer;
1954                 }
1955                 current.page = BufferGetPage(current.buffer);
1956
1957                 /* should not arrive at a page of the wrong type */
1958                 if (isnull ? !SpGistPageStoresNulls(current.page) :
1959                         SpGistPageStoresNulls(current.page))
1960                         elog(ERROR, "SPGiST index page %u has wrong nulls flag",
1961                                  current.blkno);
1962
1963                 if (SpGistPageIsLeaf(current.page))
1964                 {
1965                         SpGistLeafTuple leafTuple;
1966                         int                     nToSplit,
1967                                                 sizeToSplit;
1968
1969                         leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
1970                         if (leafTuple->size + sizeof(ItemIdData) <=
1971                                 SpGistPageGetFreeSpace(current.page, 1))
1972                         {
1973                                 /* it fits on page, so insert it and we're done */
1974                                 addLeafTuple(index, state, leafTuple,
1975                                                          &current, &parent, isnull, isNew);
1976                                 break;
1977                         }
1978                         else if ((sizeToSplit =
1979                                           checkSplitConditions(index, state, &current,
1980                                                                         &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
1981                                          nToSplit < 64 &&
1982                                          leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
1983                         {
1984                                 /*
1985                                  * the amount of data is pretty small, so just move the whole
1986                                  * chain to another leaf page rather than splitting it.
1987                                  */
1988                                 Assert(!isNew);
1989                                 moveLeafs(index, state, &current, &parent, leafTuple, isnull);
1990                                 break;                  /* we're done */
1991                         }
1992                         else
1993                         {
1994                                 /* picksplit */
1995                                 if (doPickSplit(index, state, &current, &parent,
1996                                                                 leafTuple, level, isnull, isNew))
1997                                         break;          /* doPickSplit installed new tuples */
1998
1999                                 /* leaf tuple will not be inserted yet */
2000                                 pfree(leafTuple);
2001
2002                                 /*
2003                                  * current now describes new inner tuple, go insert into it
2004                                  */
2005                                 Assert(!SpGistPageIsLeaf(current.page));
2006                                 goto process_inner_tuple;
2007                         }
2008                 }
2009                 else    /* non-leaf page */
2010                 {
2011                         /*
2012                          * Apply the opclass choose function to figure out how to insert
2013                          * the given datum into the current inner tuple.
2014                          */
2015                         SpGistInnerTuple innerTuple;
2016                         spgChooseIn in;
2017                         spgChooseOut out;
2018
2019                         /*
2020                          * spgAddNode and spgSplitTuple cases will loop back to here to
2021                          * complete the insertion operation.  Just in case the choose
2022                          * function is broken and produces add or split requests
2023                          * repeatedly, check for query cancel.
2024                          */
2025         process_inner_tuple:
2026                         CHECK_FOR_INTERRUPTS();
2027
2028                         innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2029                                                                 PageGetItemId(current.page, current.offnum));
2030
2031                         in.datum = datum;
2032                         in.leafDatum = leafDatum;
2033                         in.level = level;
2034                         in.allTheSame = innerTuple->allTheSame;
2035                         in.hasPrefix = (innerTuple->prefixSize > 0);
2036                         in.prefixDatum = SGITDATUM(innerTuple, state);
2037                         in.nNodes = innerTuple->nNodes;
2038                         in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2039
2040                         memset(&out, 0, sizeof(out));
2041
2042                         if (!isnull)
2043                         {
2044                                 /* use user-defined choose method */
2045                                 FunctionCall2Coll(procinfo,
2046                                                                   index->rd_indcollation[0],
2047                                                                   PointerGetDatum(&in),
2048                                                                   PointerGetDatum(&out));
2049                         }
2050                         else
2051                         {
2052                                 /* force "match" action (to insert to random subnode) */
2053                                 out.resultType = spgMatchNode;
2054                         }
2055
2056                         if (innerTuple->allTheSame)
2057                         {
2058                                 /*
2059                                  * It's not allowed to do an AddNode at an allTheSame tuple.
2060                                  * Opclass must say "match", in which case we choose a random
2061                                  * one of the nodes to descend into, or "split".
2062                                  */
2063                                 if (out.resultType == spgAddNode)
2064                                         elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2065                                 else if (out.resultType == spgMatchNode)
2066                                         out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2067                         }
2068
2069                         switch (out.resultType)
2070                         {
2071                                 case spgMatchNode:
2072                                         /* Descend to N'th child node */
2073                                         spgMatchNodeAction(index, state, innerTuple,
2074                                                                            &current, &parent,
2075                                                                            out.result.matchNode.nodeN);
2076                                         /* Adjust level as per opclass request */
2077                                         level += out.result.matchNode.levelAdd;
2078                                         /* Replace leafDatum and recompute leafSize */
2079                                         if (!isnull)
2080                                         {
2081                                                 leafDatum = out.result.matchNode.restDatum;
2082                                                 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2083                                                         SpGistGetTypeSize(&state->attType, leafDatum);
2084                                         }
2085
2086                                         /*
2087                                          * Loop around and attempt to insert the new leafDatum at
2088                                          * "current" (which might reference an existing child
2089                                          * tuple, or might be invalid to force us to find a new
2090                                          * page for the tuple).
2091                                          *
2092                                          * Note: if the opclass sets longValuesOK, we rely on the
2093                                          * choose function to eventually shorten the leafDatum
2094                                          * enough to fit on a page.  We could add a test here to
2095                                          * complain if the datum doesn't get visibly shorter each
2096                                          * time, but that could get in the way of opclasses that
2097                                          * "simplify" datums in a way that doesn't necessarily
2098                                          * lead to physical shortening on every cycle.
2099                                          */
2100                                         break;
2101                                 case spgAddNode:
2102                                         /* AddNode is not sensible if nodes don't have labels */
2103                                         if (in.nodeLabels == NULL)
2104                                                 elog(ERROR, "cannot add a node to an inner tuple without node labels");
2105                                         /* Add node to inner tuple, per request */
2106                                         spgAddNodeAction(index, state, innerTuple,
2107                                                                          &current, &parent,
2108                                                                          out.result.addNode.nodeN,
2109                                                                          out.result.addNode.nodeLabel);
2110
2111                                         /*
2112                                          * Retry insertion into the enlarged node.      We assume that
2113                                          * we'll get a MatchNode result this time.
2114                                          */
2115                                         goto process_inner_tuple;
2116                                         break;
2117                                 case spgSplitTuple:
2118                                         /* Split inner tuple, per request */
2119                                         spgSplitNodeAction(index, state, innerTuple,
2120                                                                            &current, &out);
2121
2122                                         /* Retry insertion into the split node */
2123                                         goto process_inner_tuple;
2124                                         break;
2125                                 default:
2126                                         elog(ERROR, "unrecognized SPGiST choose result: %d",
2127                                                  (int) out.resultType);
2128                                         break;
2129                         }
2130                 }
2131         }                                                       /* end loop */
2132
2133         /*
2134          * Release any buffers we're still holding.  Beware of possibility that
2135          * current and parent reference same buffer.
2136          */
2137         if (current.buffer != InvalidBuffer)
2138         {
2139                 SpGistSetLastUsedPage(index, current.buffer);
2140                 UnlockReleaseBuffer(current.buffer);
2141         }
2142         if (parent.buffer != InvalidBuffer &&
2143                 parent.buffer != current.buffer)
2144         {
2145                 SpGistSetLastUsedPage(index, parent.buffer);
2146                 UnlockReleaseBuffer(parent.buffer);
2147         }
2148 }