]> granicus.if.org Git - postgresql/blob - src/backend/access/rtree/rtree.c
358d307d0b5ed863b6eaabd5f740614e5136a177
[postgresql] / src / backend / access / rtree / rtree.c
1 /*-------------------------------------------------------------------------
2  *
3  * rtree.c
4  *        interface routines for the postgres rtree indexed access method.
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.48 2000/06/13 07:34:49 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/heapam.h"
20 #include "access/rtree.h"
21 #include "catalog/index.h"
22 #include "executor/executor.h"
23 #include "miscadmin.h"
24
25
26 typedef struct SPLITVEC
27 {
28         OffsetNumber *spl_left;
29         int                     spl_nleft;
30         char       *spl_ldatum;
31         OffsetNumber *spl_right;
32         int                     spl_nright;
33         char       *spl_rdatum;
34 } SPLITVEC;
35
36 typedef struct RTSTATE
37 {
38         FmgrInfo        unionFn;                /* union function */
39         FmgrInfo        sizeFn;                 /* size function */
40         FmgrInfo        interFn;                /* intersection function */
41 } RTSTATE;
42
43 /* non-export function prototypes */
44 static InsertIndexResult rtdoinsert(Relation r, IndexTuple itup,
45                    RTSTATE *rtstate);
46 static void rttighten(Relation r, RTSTACK *stk, char *datum, int att_size,
47                   RTSTATE *rtstate);
48 static InsertIndexResult dosplit(Relation r, Buffer buffer, RTSTACK *stack,
49                 IndexTuple itup, RTSTATE *rtstate);
50 static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup,
51                         IndexTuple rtup, RTSTATE *rtstate);
52 static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt);
53 static void picksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
54                   RTSTATE *rtstate);
55 static void RTInitBuffer(Buffer b, uint32 f);
56 static OffsetNumber choose(Relation r, Page p, IndexTuple it,
57            RTSTATE *rtstate);
58 static int      nospace(Page p, IndexTuple it);
59 static void initRtstate(RTSTATE *rtstate, Relation index);
60
61
62 Datum
63 rtbuild(PG_FUNCTION_ARGS)
64 {
65         Relation                heap = (Relation) PG_GETARG_POINTER(0);
66         Relation                index = (Relation) PG_GETARG_POINTER(1);
67         int32                   natts = PG_GETARG_INT32(2);
68         AttrNumber         *attnum = (AttrNumber *) PG_GETARG_POINTER(3);
69 #ifdef NOT_USED
70         IndexStrategy   istrat = (IndexStrategy) PG_GETARG_POINTER(4);
71         uint16                  pcount = PG_GETARG_UINT16(5);
72         Datum              *params = (Datum *) PG_GETARG_POINTER(6);
73 #endif
74         FuncIndexInfo  *finfo = (FuncIndexInfo *) PG_GETARG_POINTER(7);
75         PredInfo           *predInfo = (PredInfo *) PG_GETARG_POINTER(8);
76         HeapScanDesc scan;
77         AttrNumber      i;
78         HeapTuple       htup;
79         IndexTuple      itup;
80         TupleDesc       hd,
81                                 id;
82         InsertIndexResult res;
83         Datum      *d;
84         bool       *nulls;
85         Buffer          buffer = InvalidBuffer;
86         int                     nb,
87                                 nh,
88                                 ni;
89
90 #ifndef OMIT_PARTIAL_INDEX
91         ExprContext *econtext;
92         TupleTable      tupleTable;
93         TupleTableSlot *slot;
94
95 #endif
96         Node       *pred,
97                            *oldPred;
98         RTSTATE         rtState;
99
100         initRtstate(&rtState, index);
101
102         pred = predInfo->pred;
103         oldPred = predInfo->oldPred;
104
105         /*
106          * We expect to be called exactly once for any index relation. If
107          * that's not the case, big trouble's what we have.
108          */
109
110         if (oldPred == NULL && (nb = RelationGetNumberOfBlocks(index)) != 0)
111                 elog(ERROR, "%s already contains data", RelationGetRelationName(index));
112
113         /* initialize the root page (if this is a new index) */
114         if (oldPred == NULL)
115         {
116                 buffer = ReadBuffer(index, P_NEW);
117                 RTInitBuffer(buffer, F_LEAF);
118                 WriteBuffer(buffer);
119         }
120
121         /* init the tuple descriptors and get set for a heap scan */
122         hd = RelationGetDescr(heap);
123         id = RelationGetDescr(index);
124         d = (Datum *) palloc(natts * sizeof(*d));
125         nulls = (bool *) palloc(natts * sizeof(*nulls));
126
127         /*
128          * If this is a predicate (partial) index, we will need to evaluate
129          * the predicate using ExecQual, which requires the current tuple to
130          * be in a slot of a TupleTable.  In addition, ExecQual must have an
131          * ExprContext referring to that slot.  Here, we initialize dummy
132          * TupleTable and ExprContext objects for this purpose. --Nels, Feb
133          * '92
134          */
135 #ifndef OMIT_PARTIAL_INDEX
136         if (pred != NULL || oldPred != NULL)
137         {
138                 tupleTable = ExecCreateTupleTable(1);
139                 slot = ExecAllocTableSlot(tupleTable);
140                 econtext = makeNode(ExprContext);
141                 FillDummyExprContext(econtext, slot, hd, InvalidBuffer);
142         }
143         else
144         {
145                 econtext = NULL;
146                 tupleTable = NULL;
147                 slot = NULL;
148         }
149 #endif   /* OMIT_PARTIAL_INDEX */
150
151         /* count the tuples as we insert them */
152         nh = ni = 0;
153
154         scan = heap_beginscan(heap, 0, SnapshotNow, 0, (ScanKey) NULL);
155
156         while (HeapTupleIsValid(htup = heap_getnext(scan, 0)))
157         {
158                 nh++;
159
160                 /*
161                  * If oldPred != NULL, this is an EXTEND INDEX command, so skip
162                  * this tuple if it was already in the existing partial index
163                  */
164                 if (oldPred != NULL)
165                 {
166 #ifndef OMIT_PARTIAL_INDEX
167                         /* SetSlotContents(slot, htup); */
168                         slot->val = htup;
169                         if (ExecQual((List *) oldPred, econtext, false))
170                         {
171                                 ni++;
172                                 continue;
173                         }
174 #endif   /* OMIT_PARTIAL_INDEX */
175                 }
176
177                 /*
178                  * Skip this tuple if it doesn't satisfy the partial-index
179                  * predicate
180                  */
181                 if (pred != NULL)
182                 {
183 #ifndef OMIT_PARTIAL_INDEX
184                         /* SetSlotContents(slot, htup); */
185                         slot->val = htup;
186                         if (!ExecQual((List *) pred, econtext, false))
187                                 continue;
188 #endif   /* OMIT_PARTIAL_INDEX */
189                 }
190
191                 ni++;
192
193                 /*
194                  * For the current heap tuple, extract all the attributes we use
195                  * in this index, and note which are null.
196                  */
197
198                 for (i = 1; i <= natts; i++)
199                 {
200                         int                     attoff;
201                         bool            attnull;
202
203                         /*
204                          * Offsets are from the start of the tuple, and are
205                          * zero-based; indices are one-based.  The next call returns i
206                          * - 1.  That's data hiding for you.
207                          */
208
209                         attoff = AttrNumberGetAttrOffset(i);
210
211                         /*
212                          * d[attoff] = HeapTupleGetAttributeValue(htup, buffer,
213                          */
214                         d[attoff] = GetIndexValue(htup,
215                                                                           hd,
216                                                                           attoff,
217                                                                           attnum,
218                                                                           finfo,
219                                                                           &attnull);
220                         nulls[attoff] = (attnull ? 'n' : ' ');
221                 }
222
223                 /* form an index tuple and point it at the heap tuple */
224                 itup = index_formtuple(id, &d[0], nulls);
225                 itup->t_tid = htup->t_self;
226
227                 /*
228                  * Since we already have the index relation locked, we call
229                  * rtdoinsert directly.  Normal access method calls dispatch
230                  * through rtinsert, which locks the relation for write.  This is
231                  * the right thing to do if you're inserting single tups, but not
232                  * when you're initializing the whole index at once.
233                  */
234
235                 res = rtdoinsert(index, itup, &rtState);
236                 pfree(itup);
237                 pfree(res);
238         }
239
240         /* okay, all heap tuples are indexed */
241         heap_endscan(scan);
242
243         if (pred != NULL || oldPred != NULL)
244         {
245 #ifndef OMIT_PARTIAL_INDEX
246                 ExecDropTupleTable(tupleTable, true);
247                 pfree(econtext);
248 #endif   /* OMIT_PARTIAL_INDEX */
249         }
250
251         /*
252          * Since we just counted the tuples in the heap, we update its stats
253          * in pg_class to guarantee that the planner takes advantage of the
254          * index we just created.  But, only update statistics during normal
255          * index definitions, not for indices on system catalogs created
256          * during bootstrap processing.  We must close the relations before
257          * updating statistics to guarantee that the relcache entries are
258          * flushed when we increment the command counter in UpdateStats(). But
259          * we do not release any locks on the relations; those will be held
260          * until end of transaction.
261          */
262         if (IsNormalProcessingMode())
263         {
264                 Oid                     hrelid = RelationGetRelid(heap);
265                 Oid                     irelid = RelationGetRelid(index);
266                 bool            inplace = IsReindexProcessing();
267
268                 heap_close(heap, NoLock);
269                 index_close(index);
270                 UpdateStats(hrelid, nh, inplace);
271                 UpdateStats(irelid, ni, inplace);
272                 if (oldPred != NULL && !inplace)
273                 {
274                         if (ni == nh)
275                                 pred = NULL;
276                         UpdateIndexPredicate(irelid, oldPred, pred);
277                 }
278         }
279
280         /* be tidy */
281         pfree(nulls);
282         pfree(d);
283
284         PG_RETURN_POINTER(NULL);        /* no real return value */
285 }
286
287 /*
288  *      rtinsert -- wrapper for rtree tuple insertion.
289  *
290  *        This is the public interface routine for tuple insertion in rtrees.
291  *        It doesn't do any work; just locks the relation and passes the buck.
292  */
293 Datum
294 rtinsert(PG_FUNCTION_ARGS)
295 {
296         Relation                r = (Relation) PG_GETARG_POINTER(0);
297         Datum              *datum = (Datum *) PG_GETARG_POINTER(1);
298         char               *nulls = (char *) PG_GETARG_POINTER(2);
299         ItemPointer             ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
300 #ifdef NOT_USED
301         Relation                heapRel = (Relation) PG_GETARG_POINTER(4);
302 #endif
303         InsertIndexResult res;
304         IndexTuple      itup;
305         RTSTATE         rtState;
306
307         /* generate an index tuple */
308         itup = index_formtuple(RelationGetDescr(r), datum, nulls);
309         itup->t_tid = *ht_ctid;
310         initRtstate(&rtState, r);
311
312         /*
313          * Notes in ExecUtils:ExecOpenIndices()
314          *
315          * RelationSetLockForWrite(r);
316          */
317
318         res = rtdoinsert(r, itup, &rtState);
319
320         PG_RETURN_POINTER(res);
321 }
322
323 static InsertIndexResult
324 rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
325 {
326         Page            page;
327         Buffer          buffer;
328         BlockNumber blk;
329         IndexTuple      which;
330         OffsetNumber l;
331         RTSTACK    *stack;
332         InsertIndexResult res;
333         RTreePageOpaque opaque;
334         char       *datum;
335
336         blk = P_ROOT;
337         buffer = InvalidBuffer;
338         stack = (RTSTACK *) NULL;
339
340         do
341         {
342                 /* let go of current buffer before getting next */
343                 if (buffer != InvalidBuffer)
344                         ReleaseBuffer(buffer);
345
346                 /* get next buffer */
347                 buffer = ReadBuffer(r, blk);
348                 page = (Page) BufferGetPage(buffer);
349
350                 opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
351                 if (!(opaque->flags & F_LEAF))
352                 {
353                         RTSTACK    *n;
354                         ItemId          iid;
355
356                         n = (RTSTACK *) palloc(sizeof(RTSTACK));
357                         n->rts_parent = stack;
358                         n->rts_blk = blk;
359                         n->rts_child = choose(r, page, itup, rtstate);
360                         stack = n;
361
362                         iid = PageGetItemId(page, n->rts_child);
363                         which = (IndexTuple) PageGetItem(page, iid);
364                         blk = ItemPointerGetBlockNumber(&(which->t_tid));
365                 }
366         } while (!(opaque->flags & F_LEAF));
367
368         if (nospace(page, itup))
369         {
370                 /* need to do a split */
371                 res = dosplit(r, buffer, stack, itup, rtstate);
372                 freestack(stack);
373                 WriteBuffer(buffer);    /* don't forget to release buffer! */
374                 return res;
375         }
376
377         /* add the item and write the buffer */
378         if (PageIsEmpty(page))
379         {
380                 l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
381                                                 FirstOffsetNumber,
382                                                 LP_USED);
383         }
384         else
385         {
386                 l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
387                                                 OffsetNumberNext(PageGetMaxOffsetNumber(page)),
388                                                 LP_USED);
389         }
390
391         WriteBuffer(buffer);
392
393         datum = (((char *) itup) + sizeof(IndexTupleData));
394
395         /* now expand the page boundary in the parent to include the new child */
396         rttighten(r, stack, datum,
397                           (IndexTupleSize(itup) - sizeof(IndexTupleData)), rtstate);
398         freestack(stack);
399
400         /* build and return an InsertIndexResult for this insertion */
401         res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
402         ItemPointerSet(&(res->pointerData), blk, l);
403
404         return res;
405 }
406
407 static void
408 rttighten(Relation r,
409                   RTSTACK *stk,
410                   char *datum,
411                   int att_size,
412                   RTSTATE *rtstate)
413 {
414         char       *oldud;
415         char       *tdatum;
416         Page            p;
417         float           old_size,
418                                 newd_size;
419         Buffer          b;
420
421         if (stk == (RTSTACK *) NULL)
422                 return;
423
424         b = ReadBuffer(r, stk->rts_blk);
425         p = BufferGetPage(b);
426
427         oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->rts_child));
428         oldud += sizeof(IndexTupleData);
429
430         FunctionCall2(&rtstate->sizeFn,
431                                   PointerGetDatum(oldud),
432                                   PointerGetDatum(&old_size));
433
434         datum = (char *)
435                 DatumGetPointer(FunctionCall2(&rtstate->unionFn,
436                                                                           PointerGetDatum(oldud),
437                                                                           PointerGetDatum(datum)));
438
439         FunctionCall2(&rtstate->sizeFn,
440                                   PointerGetDatum(datum),
441                                   PointerGetDatum(&newd_size));
442
443         if (newd_size != old_size)
444         {
445                 TupleDesc       td = RelationGetDescr(r);
446
447                 if (td->attrs[0]->attlen < 0)
448                 {
449
450                         /*
451                          * This is an internal page, so 'oldud' had better be a union
452                          * (constant-length) key, too.  (See comment below.)
453                          */
454                         Assert(VARSIZE(datum) == VARSIZE(oldud));
455                         memmove(oldud, datum, VARSIZE(datum));
456                 }
457                 else
458                         memmove(oldud, datum, att_size);
459                 WriteBuffer(b);
460
461                 /*
462                  * The user may be defining an index on variable-sized data (like
463                  * polygons).  If so, we need to get a constant-sized datum for
464                  * insertion on the internal page.      We do this by calling the
465                  * union proc, which is guaranteed to return a rectangle.
466                  */
467
468                 tdatum = (char *)
469                         DatumGetPointer(FunctionCall2(&rtstate->unionFn,
470                                                                                   PointerGetDatum(datum),
471                                                                                   PointerGetDatum(datum)));
472                 rttighten(r, stk->rts_parent, tdatum, att_size, rtstate);
473                 pfree(tdatum);
474         }
475         else
476                 ReleaseBuffer(b);
477         pfree(datum);
478 }
479
480 /*
481  *      dosplit -- split a page in the tree.
482  *
483  *        This is the quadratic-cost split algorithm Guttman describes in
484  *        his paper.  The reason we chose it is that you can implement this
485  *        with less information about the data types on which you're operating.
486  */
487 static InsertIndexResult
488 dosplit(Relation r,
489                 Buffer buffer,
490                 RTSTACK *stack,
491                 IndexTuple itup,
492                 RTSTATE *rtstate)
493 {
494         Page            p;
495         Buffer          leftbuf,
496                                 rightbuf;
497         Page            left,
498                                 right;
499         ItemId          itemid;
500         IndexTuple      item;
501         IndexTuple      ltup,
502                                 rtup;
503         OffsetNumber maxoff;
504         OffsetNumber i;
505         OffsetNumber leftoff,
506                                 rightoff;
507         BlockNumber lbknum,
508                                 rbknum;
509         BlockNumber bufblock;
510         RTreePageOpaque opaque;
511         int                     blank;
512         InsertIndexResult res;
513         char       *isnull;
514         SPLITVEC        v;
515         TupleDesc       tupDesc;
516
517         isnull = (char *) palloc(r->rd_rel->relnatts);
518         for (blank = 0; blank < r->rd_rel->relnatts; blank++)
519                 isnull[blank] = ' ';
520         p = (Page) BufferGetPage(buffer);
521         opaque = (RTreePageOpaque) PageGetSpecialPointer(p);
522
523         /*
524          * The root of the tree is the first block in the relation.  If we're
525          * about to split the root, we need to do some hocus-pocus to enforce
526          * this guarantee.
527          */
528
529         if (BufferGetBlockNumber(buffer) == P_ROOT)
530         {
531                 leftbuf = ReadBuffer(r, P_NEW);
532                 RTInitBuffer(leftbuf, opaque->flags);
533                 lbknum = BufferGetBlockNumber(leftbuf);
534                 left = (Page) BufferGetPage(leftbuf);
535         }
536         else
537         {
538                 leftbuf = buffer;
539                 IncrBufferRefCount(buffer);
540                 lbknum = BufferGetBlockNumber(buffer);
541                 left = (Page) PageGetTempPage(p, sizeof(RTreePageOpaqueData));
542         }
543
544         rightbuf = ReadBuffer(r, P_NEW);
545         RTInitBuffer(rightbuf, opaque->flags);
546         rbknum = BufferGetBlockNumber(rightbuf);
547         right = (Page) BufferGetPage(rightbuf);
548
549         picksplit(r, p, &v, itup, rtstate);
550
551         leftoff = rightoff = FirstOffsetNumber;
552         maxoff = PageGetMaxOffsetNumber(p);
553         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
554         {
555                 itemid = PageGetItemId(p, i);
556                 item = (IndexTuple) PageGetItem(p, itemid);
557
558                 if (i == *(v.spl_left))
559                 {
560                         PageAddItem(left, (Item) item, IndexTupleSize(item),
561                                                 leftoff, LP_USED);
562                         leftoff = OffsetNumberNext(leftoff);
563                         v.spl_left++;           /* advance in left split vector */
564                 }
565                 else
566                 {
567                         PageAddItem(right, (Item) item, IndexTupleSize(item),
568                                                 rightoff, LP_USED);
569                         rightoff = OffsetNumberNext(rightoff);
570                         v.spl_right++;          /* advance in right split vector */
571                 }
572         }
573
574         /* build an InsertIndexResult for this insertion */
575         res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
576
577         /* now insert the new index tuple */
578         if (*(v.spl_left) != FirstOffsetNumber)
579         {
580                 PageAddItem(left, (Item) itup, IndexTupleSize(itup),
581                                         leftoff, LP_USED);
582                 leftoff = OffsetNumberNext(leftoff);
583                 ItemPointerSet(&(res->pointerData), lbknum, leftoff);
584         }
585         else
586         {
587                 PageAddItem(right, (Item) itup, IndexTupleSize(itup),
588                                         rightoff, LP_USED);
589                 rightoff = OffsetNumberNext(rightoff);
590                 ItemPointerSet(&(res->pointerData), rbknum, rightoff);
591         }
592
593         if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT)
594                 PageRestoreTempPage(left, p);
595         WriteBuffer(leftbuf);
596         WriteBuffer(rightbuf);
597
598         /*
599          * Okay, the page is split.  We have three things left to do:
600          *
601          * 1)  Adjust any active scans on this index to cope with changes we
602          * introduced in its structure by splitting this page.
603          *
604          * 2)  "Tighten" the bounding box of the pointer to the left page in the
605          * parent node in the tree, if any.  Since we moved a bunch of stuff
606          * off the left page, we expect it to get smaller.      This happens in
607          * the internal insertion routine.
608          *
609          * 3)  Insert a pointer to the right page in the parent.  This may cause
610          * the parent to split.  If it does, we need to repeat steps one and
611          * two for each split node in the tree.
612          */
613
614         /* adjust active scans */
615         rtadjscans(r, RTOP_SPLIT, bufblock, FirstOffsetNumber);
616
617         tupDesc = r->rd_att;
618         ltup = (IndexTuple) index_formtuple(tupDesc,
619                                                                           (Datum *) &(v.spl_ldatum), isnull);
620         rtup = (IndexTuple) index_formtuple(tupDesc,
621                                                                           (Datum *) &(v.spl_rdatum), isnull);
622         pfree(isnull);
623
624         /* set pointers to new child pages in the internal index tuples */
625         ItemPointerSet(&(ltup->t_tid), lbknum, 1);
626         ItemPointerSet(&(rtup->t_tid), rbknum, 1);
627
628         rtintinsert(r, stack, ltup, rtup, rtstate);
629
630         pfree(ltup);
631         pfree(rtup);
632
633         return res;
634 }
635
636 static void
637 rtintinsert(Relation r,
638                         RTSTACK *stk,
639                         IndexTuple ltup,
640                         IndexTuple rtup,
641                         RTSTATE *rtstate)
642 {
643         IndexTuple      old;
644         Buffer          b;
645         Page            p;
646         char       *ldatum,
647                            *rdatum,
648                            *newdatum;
649         InsertIndexResult res;
650
651         if (stk == (RTSTACK *) NULL)
652         {
653                 rtnewroot(r, ltup, rtup);
654                 return;
655         }
656
657         b = ReadBuffer(r, stk->rts_blk);
658         p = BufferGetPage(b);
659         old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child));
660
661         /*
662          * This is a hack.      Right now, we force rtree keys to be constant
663          * size. To fix this, need delete the old key and add both left and
664          * right for the two new pages.  The insertion of left may force a
665          * split if the new left key is bigger than the old key.
666          */
667
668         if (IndexTupleSize(old) != IndexTupleSize(ltup))
669                 elog(ERROR, "Variable-length rtree keys are not supported.");
670
671         /* install pointer to left child */
672         memmove(old, ltup, IndexTupleSize(ltup));
673
674         if (nospace(p, rtup))
675         {
676                 newdatum = (((char *) ltup) + sizeof(IndexTupleData));
677                 rttighten(r, stk->rts_parent, newdatum,
678                            (IndexTupleSize(ltup) - sizeof(IndexTupleData)), rtstate);
679                 res = dosplit(r, b, stk->rts_parent, rtup, rtstate);
680                 WriteBuffer(b);                 /* don't forget to release buffer!  -
681                                                                  * 01/31/94 */
682                 pfree(res);
683         }
684         else
685         {
686                 PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
687                                         PageGetMaxOffsetNumber(p), LP_USED);
688                 WriteBuffer(b);
689                 ldatum = (((char *) ltup) + sizeof(IndexTupleData));
690                 rdatum = (((char *) rtup) + sizeof(IndexTupleData));
691                 newdatum = (char *)
692                         DatumGetPointer(FunctionCall2(&rtstate->unionFn,
693                                                                                   PointerGetDatum(ldatum),
694                                                                                   PointerGetDatum(rdatum)));
695
696                 rttighten(r, stk->rts_parent, newdatum,
697                            (IndexTupleSize(rtup) - sizeof(IndexTupleData)), rtstate);
698
699                 pfree(newdatum);
700         }
701 }
702
703 static void
704 rtnewroot(Relation r, IndexTuple lt, IndexTuple rt)
705 {
706         Buffer          b;
707         Page            p;
708
709         b = ReadBuffer(r, P_ROOT);
710         RTInitBuffer(b, 0);
711         p = BufferGetPage(b);
712         PageAddItem(p, (Item) lt, IndexTupleSize(lt),
713                                 FirstOffsetNumber, LP_USED);
714         PageAddItem(p, (Item) rt, IndexTupleSize(rt),
715                                 OffsetNumberNext(FirstOffsetNumber), LP_USED);
716         WriteBuffer(b);
717 }
718
719 static void
720 picksplit(Relation r,
721                   Page page,
722                   SPLITVEC *v,
723                   IndexTuple itup,
724                   RTSTATE *rtstate)
725 {
726         OffsetNumber maxoff;
727         OffsetNumber i,
728                                 j;
729         IndexTuple      item_1,
730                                 item_2;
731         char       *datum_alpha,
732                            *datum_beta;
733         char       *datum_l,
734                            *datum_r;
735         char       *union_d,
736                            *union_dl,
737                            *union_dr;
738         char       *inter_d;
739         bool            firsttime;
740         float           size_alpha,
741                                 size_beta,
742                                 size_union,
743                                 size_inter;
744         float           size_waste,
745                                 waste;
746         float           size_l,
747                                 size_r;
748         int                     nbytes;
749         OffsetNumber seed_1 = 0,
750                                 seed_2 = 0;
751         OffsetNumber *left,
752                            *right;
753
754         maxoff = PageGetMaxOffsetNumber(page);
755
756         nbytes = (maxoff + 2) * sizeof(OffsetNumber);
757         v->spl_left = (OffsetNumber *) palloc(nbytes);
758         v->spl_right = (OffsetNumber *) palloc(nbytes);
759
760         firsttime = true;
761         waste = 0.0;
762
763         for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i))
764         {
765                 item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
766                 datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
767                 for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j))
768                 {
769                         item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j));
770                         datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
771
772                         /* compute the wasted space by unioning these guys */
773                         union_d = (char *)
774                                 DatumGetPointer(FunctionCall2(&rtstate->unionFn,
775                                                                                           PointerGetDatum(datum_alpha),
776                                                                                           PointerGetDatum(datum_beta)));
777                         FunctionCall2(&rtstate->sizeFn,
778                                                   PointerGetDatum(union_d),
779                                                   PointerGetDatum(&size_union));
780                         inter_d = (char *)
781                                 DatumGetPointer(FunctionCall2(&rtstate->interFn,
782                                                                                           PointerGetDatum(datum_alpha),
783                                                                                           PointerGetDatum(datum_beta)));
784                         FunctionCall2(&rtstate->sizeFn,
785                                                   PointerGetDatum(inter_d),
786                                                   PointerGetDatum(&size_inter));
787                         size_waste = size_union - size_inter;
788
789                         pfree(union_d);
790
791                         if (inter_d != (char *) NULL)
792                                 pfree(inter_d);
793
794                         /*
795                          * are these a more promising split that what we've already
796                          * seen?
797                          */
798
799                         if (size_waste > waste || firsttime)
800                         {
801                                 waste = size_waste;
802                                 seed_1 = i;
803                                 seed_2 = j;
804                                 firsttime = false;
805                         }
806                 }
807         }
808
809         left = v->spl_left;
810         v->spl_nleft = 0;
811         right = v->spl_right;
812         v->spl_nright = 0;
813
814         item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1));
815         datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
816         datum_l = (char *)
817                 DatumGetPointer(FunctionCall2(&rtstate->unionFn,
818                                                                           PointerGetDatum(datum_alpha),
819                                                                           PointerGetDatum(datum_alpha)));
820         FunctionCall2(&rtstate->sizeFn,
821                                   PointerGetDatum(datum_l),
822                                   PointerGetDatum(&size_l));
823         item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
824         datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
825         datum_r = (char *)
826                 DatumGetPointer(FunctionCall2(&rtstate->unionFn,
827                                                                           PointerGetDatum(datum_beta),
828                                                                           PointerGetDatum(datum_beta)));
829         FunctionCall2(&rtstate->sizeFn,
830                                   PointerGetDatum(datum_r),
831                                   PointerGetDatum(&size_r));
832
833         /*
834          * Now split up the regions between the two seeds.      An important
835          * property of this split algorithm is that the split vector v has the
836          * indices of items to be split in order in its left and right
837          * vectors.  We exploit this property by doing a merge in the code
838          * that actually splits the page.
839          *
840          * For efficiency, we also place the new index tuple in this loop. This
841          * is handled at the very end, when we have placed all the existing
842          * tuples and i == maxoff + 1.
843          */
844
845         maxoff = OffsetNumberNext(maxoff);
846         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
847         {
848
849                 /*
850                  * If we've already decided where to place this item, just put it
851                  * on the right list.  Otherwise, we need to figure out which page
852                  * needs the least enlargement in order to store the item.
853                  */
854
855                 if (i == seed_1)
856                 {
857                         *left++ = i;
858                         v->spl_nleft++;
859                         continue;
860                 }
861                 else if (i == seed_2)
862                 {
863                         *right++ = i;
864                         v->spl_nright++;
865                         continue;
866                 }
867
868                 /* okay, which page needs least enlargement? */
869                 if (i == maxoff)
870                         item_1 = itup;
871                 else
872                         item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
873
874                 datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
875                 union_dl = (char *)
876                         DatumGetPointer(FunctionCall2(&rtstate->unionFn,
877                                                                                   PointerGetDatum(datum_l),
878                                                                                   PointerGetDatum(datum_alpha)));
879                 union_dr = (char *)
880                         DatumGetPointer(FunctionCall2(&rtstate->unionFn,
881                                                                                   PointerGetDatum(datum_r),
882                                                                                   PointerGetDatum(datum_alpha)));
883                 FunctionCall2(&rtstate->sizeFn,
884                                           PointerGetDatum(union_dl),
885                                           PointerGetDatum(&size_alpha));
886                 FunctionCall2(&rtstate->sizeFn,
887                                           PointerGetDatum(union_dr),
888                                           PointerGetDatum(&size_beta));
889
890                 /* pick which page to add it to */
891                 if (size_alpha - size_l < size_beta - size_r)
892                 {
893                         pfree(datum_l);
894                         pfree(union_dr);
895                         datum_l = union_dl;
896                         size_l = size_alpha;
897                         *left++ = i;
898                         v->spl_nleft++;
899                 }
900                 else
901                 {
902                         pfree(datum_r);
903                         pfree(union_dl);
904                         datum_r = union_dr;
905                         size_r = size_beta;
906                         *right++ = i;
907                         v->spl_nright++;
908                 }
909         }
910         *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
911
912         v->spl_ldatum = datum_l;
913         v->spl_rdatum = datum_r;
914 }
915
916 static void
917 RTInitBuffer(Buffer b, uint32 f)
918 {
919         RTreePageOpaque opaque;
920         Page            page;
921         Size            pageSize;
922
923         pageSize = BufferGetPageSize(b);
924
925         page = BufferGetPage(b);
926         MemSet(page, 0, (int) pageSize);
927         PageInit(page, pageSize, sizeof(RTreePageOpaqueData));
928
929         opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
930         opaque->flags = f;
931 }
932
933 static OffsetNumber
934 choose(Relation r, Page p, IndexTuple it, RTSTATE *rtstate)
935 {
936         OffsetNumber maxoff;
937         OffsetNumber i;
938         char       *ud,
939                            *id;
940         char       *datum;
941         float           usize,
942                                 dsize;
943         OffsetNumber which;
944         float           which_grow;
945
946         id = ((char *) it) + sizeof(IndexTupleData);
947         maxoff = PageGetMaxOffsetNumber(p);
948         which_grow = -1.0;
949         which = -1;
950
951         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
952         {
953                 datum = (char *) PageGetItem(p, PageGetItemId(p, i));
954                 datum += sizeof(IndexTupleData);
955                 FunctionCall2(&rtstate->sizeFn,
956                                           PointerGetDatum(datum),
957                                           PointerGetDatum(&dsize));
958                 ud = (char *)
959                         DatumGetPointer(FunctionCall2(&rtstate->unionFn,
960                                                                                   PointerGetDatum(datum),
961                                                                                   PointerGetDatum(id)));
962                 FunctionCall2(&rtstate->sizeFn,
963                                           PointerGetDatum(ud),
964                                           PointerGetDatum(&usize));
965                 pfree(ud);
966                 if (which_grow < 0 || usize - dsize < which_grow)
967                 {
968                         which = i;
969                         which_grow = usize - dsize;
970                         if (which_grow == 0)
971                                 break;
972                 }
973         }
974
975         return which;
976 }
977
978 static int
979 nospace(Page p, IndexTuple it)
980 {
981         return PageGetFreeSpace(p) < IndexTupleSize(it);
982 }
983
984 void
985 freestack(RTSTACK *s)
986 {
987         RTSTACK    *p;
988
989         while (s != (RTSTACK *) NULL)
990         {
991                 p = s->rts_parent;
992                 pfree(s);
993                 s = p;
994         }
995 }
996
997 Datum
998 rtdelete(PG_FUNCTION_ARGS)
999 {
1000         Relation                r = (Relation) PG_GETARG_POINTER(0);
1001         ItemPointer             tid = (ItemPointer) PG_GETARG_POINTER(1);
1002         BlockNumber blkno;
1003         OffsetNumber offnum;
1004         Buffer          buf;
1005         Page            page;
1006
1007         /*
1008          * Notes in ExecUtils:ExecOpenIndices() Also note that only vacuum
1009          * deletes index tuples now...
1010          *
1011          * RelationSetLockForWrite(r);
1012          */
1013
1014         blkno = ItemPointerGetBlockNumber(tid);
1015         offnum = ItemPointerGetOffsetNumber(tid);
1016
1017         /* adjust any scans that will be affected by this deletion */
1018         rtadjscans(r, RTOP_DEL, blkno, offnum);
1019
1020         /* delete the index tuple */
1021         buf = ReadBuffer(r, blkno);
1022         page = BufferGetPage(buf);
1023
1024         PageIndexTupleDelete(page, offnum);
1025
1026         WriteBuffer(buf);
1027
1028         PG_RETURN_POINTER(NULL);        /* no real return value */
1029 }
1030
1031 static void
1032 initRtstate(RTSTATE *rtstate, Relation index)
1033 {
1034         RegProcedure union_proc,
1035                                 size_proc,
1036                                 inter_proc;
1037
1038         union_proc = index_getprocid(index, 1, RT_UNION_PROC);
1039         size_proc = index_getprocid(index, 1, RT_SIZE_PROC);
1040         inter_proc = index_getprocid(index, 1, RT_INTER_PROC);
1041         fmgr_info(union_proc, &rtstate->unionFn);
1042         fmgr_info(size_proc, &rtstate->sizeFn);
1043         fmgr_info(inter_proc, &rtstate->interFn);
1044         return;
1045 }
1046
1047 #ifdef RTDEBUG
1048
1049 void
1050 _rtdump(Relation r)
1051 {
1052         Buffer          buf;
1053         Page            page;
1054         OffsetNumber offnum,
1055                                 maxoff;
1056         BlockNumber blkno;
1057         BlockNumber nblocks;
1058         RTreePageOpaque po;
1059         IndexTuple      itup;
1060         BlockNumber itblkno;
1061         OffsetNumber itoffno;
1062         char       *datum;
1063         char       *itkey;
1064
1065         nblocks = RelationGetNumberOfBlocks(r);
1066         for (blkno = 0; blkno < nblocks; blkno++)
1067         {
1068                 buf = ReadBuffer(r, blkno);
1069                 page = BufferGetPage(buf);
1070                 po = (RTreePageOpaque) PageGetSpecialPointer(page);
1071                 maxoff = PageGetMaxOffsetNumber(page);
1072                 printf("Page %d maxoff %d <%s>\n", blkno, maxoff,
1073                            (po->flags & F_LEAF ? "LEAF" : "INTERNAL"));
1074
1075                 if (PageIsEmpty(page))
1076                 {
1077                         ReleaseBuffer(buf);
1078                         continue;
1079                 }
1080
1081                 for (offnum = FirstOffsetNumber;
1082                          offnum <= maxoff;
1083                          offnum = OffsetNumberNext(offnum))
1084                 {
1085                         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
1086                         itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
1087                         itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid));
1088                         datum = ((char *) itup);
1089                         datum += sizeof(IndexTupleData);
1090                         itkey = (char *) box_out((BOX *) datum);
1091                         printf("\t[%d] size %d heap <%d,%d> key:%s\n",
1092                                    offnum, IndexTupleSize(itup), itblkno, itoffno, itkey);
1093                         pfree(itkey);
1094                 }
1095
1096                 ReleaseBuffer(buf);
1097         }
1098 }
1099
1100 #endif   /* defined RTDEBUG */