From e78fe652f48f9dbb67a21f616741e7b446204cbe Mon Sep 17 00:00:00 2001 From: "Marc G. Fournier" Date: Mon, 26 Aug 1996 20:02:12 +0000 Subject: [PATCH] Oops, thanks to Dan McGuirk for pointing out that I missed part of the commit :( Here's the rest of the GiST code thta was missing... --- src/backend/access/gist/Makefile.inc | 16 + src/backend/access/gist/gist.c | 1316 ++++++++++++++++++++++++++ src/backend/access/gist/gistget.c | 374 ++++++++ src/backend/access/gist/gistold.c | 1159 +++++++++++++++++++++++ src/backend/access/gist/gistscan.c | 382 ++++++++ src/backend/access/gist/giststrat.c | 119 +++ 6 files changed, 3366 insertions(+) create mode 100644 src/backend/access/gist/Makefile.inc create mode 100644 src/backend/access/gist/gist.c create mode 100644 src/backend/access/gist/gistget.c create mode 100644 src/backend/access/gist/gistold.c create mode 100644 src/backend/access/gist/gistscan.c create mode 100644 src/backend/access/gist/giststrat.c diff --git a/src/backend/access/gist/Makefile.inc b/src/backend/access/gist/Makefile.inc new file mode 100644 index 0000000000..8519ab0a58 --- /dev/null +++ b/src/backend/access/gist/Makefile.inc @@ -0,0 +1,16 @@ +#------------------------------------------------------------------------- +# +# Makefile.inc-- +# Makefile for access/rtree (R-Tree access method) +# +# Copyright (c) 1994, Regents of the University of California +# +# +# IDENTIFICATION +# /usr/local/devel/pglite/cvs/src/backend/access/rtree/Makefile.inc,v 1.2 1995/03/21 06:50:48 andrew Exp +# +#------------------------------------------------------------------------- + +SUBSRCS+= gist.c gistget.c gistscan.c giststrat.c + + diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c new file mode 100644 index 0000000000..4b620da537 --- /dev/null +++ b/src/backend/access/gist/gist.c @@ -0,0 +1,1316 @@ +/*------------------------------------------------------------------------- + * + * gist.c-- + * interface routines for the postgres GiST index access method. + * + * + * + * IDENTIFICATION + * + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "storage/bufmgr.h" +#include "storage/bufpage.h" + +#include "utils/elog.h" +#include "utils/palloc.h" +#include "utils/rel.h" +#include "utils/excid.h" + +#include "access/heapam.h" +#include "access/genam.h" +#include "access/gist.h" +#include "access/gistscan.h" +#include "access/funcindex.h" +#include "access/tupdesc.h" + +#include "nodes/execnodes.h" +#include "nodes/plannodes.h" + +#include "executor/executor.h" +#include "executor/tuptable.h" + +#include "catalog/index.h" + +/* non-export function prototypes */ +static InsertIndexResult gistdoinsert(Relation r, IndexTuple itup, + GISTSTATE *GISTstate); +static InsertIndexResult gistentryinsert(Relation r, GISTSTACK *stk, + IndexTuple tup, + GISTSTATE *giststate); +static void gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup, + IndexTuple rtup, GISTSTATE *giststate); +static void gistAdjustKeys(Relation r, GISTSTACK *stk, BlockNumber blk, + char *datum, int att_size, GISTSTATE *giststate); +static void gistintinsert(Relation r, GISTSTACK *stk, IndexTuple ltup, + IndexTuple rtup, GISTSTATE *giststate); +static InsertIndexResult gistSplit(Relation r, Buffer buffer, + GISTSTACK *stack, IndexTuple itup, + GISTSTATE *giststate); +static void gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple lt, + IndexTuple rt); +static void GISTInitBuffer(Buffer b, uint32 f); +static BlockNumber gistChooseSubtree(Relation r, IndexTuple itup, int level, + GISTSTATE *giststate, + GISTSTACK **retstack, Buffer *leafbuf); +static OffsetNumber gistchoose(Relation r, Page p, IndexTuple it, + GISTSTATE *giststate); +static int gistnospace(Page p, IndexTuple it); +void gistdelete(Relation r, ItemPointer tid); +static IndexTuple gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t); + + +/* +** routine to build an index. Basically calls insert over and over +*/ +void +gistbuild(Relation heap, + Relation index, + int natts, + AttrNumber *attnum, + IndexStrategy istrat, + uint16 pint, + Datum *params, + FuncIndexInfo *finfo, + PredInfo *predInfo) +{ + HeapScanDesc scan; + Buffer buffer; + AttrNumber i; + HeapTuple htup; + IndexTuple itup; + TupleDesc hd, id; + InsertIndexResult res; + Datum *d; + bool *nulls; + int nb, nh, ni; + ExprContext *econtext; + TupleTable tupleTable; + TupleTableSlot *slot; + Oid hrelid, irelid; + Node *pred, *oldPred; + GISTSTATE giststate; + GISTENTRY tmpcentry; + bool *compvec; + + /* GiSTs only know how to do stupid locking now */ + RelationSetLockForWrite(index); + + setheapoverride(TRUE); /* so we can see the new pg_index tuple */ + initGISTstate(&giststate, index); + setheapoverride(FALSE); + + pred = predInfo->pred; + oldPred = predInfo->oldPred; + + /* + * We expect to be called exactly once for any index relation. + * If that's not the case, big trouble's what we have. + */ + + if (oldPred == NULL && (nb = RelationGetNumberOfBlocks(index)) != 0) + elog(WARN, "%.16s already contains data", &(index->rd_rel->relname.data[0])); + + /* initialize the root page (if this is a new index) */ + if (oldPred == NULL) { + buffer = ReadBuffer(index, P_NEW); + GISTInitBuffer(buffer, F_LEAF); + WriteBuffer(buffer); + } + + /* init the tuple descriptors and get set for a heap scan */ + hd = RelationGetTupleDescriptor(heap); + id = RelationGetTupleDescriptor(index); + d = (Datum *)palloc(natts * sizeof (*d)); + nulls = (bool *)palloc(natts * sizeof (*nulls)); + + /* + * If this is a predicate (partial) index, we will need to evaluate the + * predicate using ExecQual, which requires the current tuple to be in a + * slot of a TupleTable. In addition, ExecQual must have an ExprContext + * referring to that slot. Here, we initialize dummy TupleTable and + * ExprContext objects for this purpose. --Nels, Feb '92 + */ +#ifndef OMIT_PARTIAL_INDEX + if (pred != NULL || oldPred != NULL) { + tupleTable = ExecCreateTupleTable(1); + slot = ExecAllocTableSlot(tupleTable); + econtext = makeNode(ExprContext); + FillDummyExprContext(econtext, slot, hd, buffer); + } +#endif /* OMIT_PARTIAL_INDEX */ + scan = heap_beginscan(heap, 0, NowTimeQual, 0, (ScanKey) NULL); + htup = heap_getnext(scan, 0, &buffer); + + /* int the tuples as we insert them */ + nh = ni = 0; + + for (; HeapTupleIsValid(htup); htup = heap_getnext(scan, 0, &buffer)) { + + nh++; + + /* + * If oldPred != NULL, this is an EXTEND INDEX command, so skip + * this tuple if it was already in the existing partial index + */ + if (oldPred != NULL) { +#ifndef OMIT_PARTIAL_INDEX + /*SetSlotContents(slot, htup); */ + slot->val = htup; + if (ExecQual((List*)oldPred, econtext) == true) { + ni++; + continue; + } +#endif /* OMIT_PARTIAL_INDEX */ + } + + /* Skip this tuple if it doesn't satisfy the partial-index predicate */ + if (pred != NULL) { +#ifndef OMIT_PARTIAL_INDEX + /*SetSlotContents(slot, htup); */ + slot->val = htup; + if (ExecQual((List*)pred, econtext) == false) + continue; +#endif /* OMIT_PARTIAL_INDEX */ + } + + ni++; + + /* + * For the current heap tuple, extract all the attributes + * we use in this index, and note which are null. + */ + + for (i = 1; i <= natts; i++) { + int attoff; + bool attnull; + + /* + * Offsets are from the start of the tuple, and are + * zero-based; indices are one-based. The next call + * returns i - 1. That's data hiding for you. + */ + + attoff = AttrNumberGetAttrOffset(i); + /* + d[attoff] = HeapTupleGetAttributeValue(htup, buffer, + */ + d[attoff] = GetIndexValue(htup, + hd, + attoff, + attnum, + finfo, + &attnull, + buffer); + nulls[attoff] = (attnull ? 'n' : ' '); + } + + /* immediately compress keys to normalize */ + compvec = (bool *)palloc(sizeof(bool) * natts); + for (i = 0; i < natts; i++) { + gistcentryinit(&giststate, &tmpcentry, (char *)d[i], + (Relation) NULL, (Page) NULL, (OffsetNumber) 0, + -1 /* size is currently bogus */, TRUE); + if (d[i] != (Datum)tmpcentry.pred && !(giststate.keytypbyval)) + compvec[i] = TRUE; + else compvec[i] = FALSE; + d[i] = (Datum)tmpcentry.pred; + } + + /* form an index tuple and point it at the heap tuple */ + itup = index_formtuple(id, &d[0], nulls); + itup->t_tid = htup->t_ctid; + + /* + * Since we already have the index relation locked, we + * call gistdoinsert directly. Normal access method calls + * dispatch through gistinsert, which locks the relation + * for write. This is the right thing to do if you're + * inserting single tups, but not when you're initializing + * the whole index at once. + */ + + res = gistdoinsert(index, itup, &giststate); + for (i = 0; i < natts; i++) + if (compvec[i] == TRUE) pfree((char *)d[i]); + pfree(itup); + pfree(res); + pfree(compvec); + } + + /* okay, all heap tuples are indexed */ + heap_endscan(scan); + RelationUnsetLockForWrite(index); + + if (pred != NULL || oldPred != NULL) { +#ifndef OMIT_PARTIAL_INDEX + ExecDestroyTupleTable(tupleTable, true); + pfree(econtext); +#endif /* OMIT_PARTIAL_INDEX */ + } + + /* + * Since we just inted the tuples in the heap, we update its + * stats in pg_relation to guarantee that the planner takes + * advantage of the index we just created. UpdateStats() does a + * CommandinterIncrement(), which flushes changed entries from + * the system relcache. The act of constructing an index changes + * these heap and index tuples in the system catalogs, so they + * need to be flushed. We close them to guarantee that they + * will be. + */ + + hrelid = heap->rd_id; + irelid = index->rd_id; + heap_close(heap); + index_close(index); + + UpdateStats(hrelid, nh, true); + UpdateStats(irelid, ni, false); + + if (oldPred != NULL) { + if (ni == nh) pred = NULL; + UpdateIndexPredicate(irelid, oldPred, pred); + } + + /* be tidy */ + pfree(nulls); + pfree(d); +} + +/* + * gistinsert -- wrapper for GiST tuple insertion. + * + * This is the public interface routine for tuple insertion in GiSTs. + * It doesn't do any work; just locks the relation and passes the buck. + */ +InsertIndexResult +gistinsert(Relation r, Datum *datum, char *nulls, ItemPointer ht_ctid) +{ + InsertIndexResult res; + IndexTuple itup; + GISTSTATE giststate; + GISTENTRY tmpentry; + int i; + bool *compvec; + + initGISTstate(&giststate, r); + + /* immediately compress keys to normalize */ + compvec = (bool *)palloc(sizeof(bool) * r->rd_att->natts); + for (i = 0; i < r->rd_att->natts; i++) { + gistcentryinit(&giststate, &tmpentry, (char *)datum[i], + (Relation) NULL, (Page) NULL, (OffsetNumber) 0, + -1 /* size is currently bogus */, TRUE); + if (datum[i] != (Datum)tmpentry.pred && !(giststate.keytypbyval)) + compvec[i] = TRUE; + else compvec[i] = FALSE; + datum[i] = (Datum)tmpentry.pred; + } + itup = index_formtuple(RelationGetTupleDescriptor(r), datum, nulls); + itup->t_tid = *ht_ctid; + + RelationSetLockForWrite(r); + res = gistdoinsert(r, itup, &giststate); + for (i = 0; i < r->rd_att->natts; i++) + if (compvec[i] == TRUE) pfree((char *)datum[i]); + pfree(itup); + pfree(compvec); + + /* XXX two-phase locking -- don't unlock the relation until EOT */ + return (res); +} + +/* +** Take a compressed entry, and install it on a page. Since we now know +** where the entry will live, we decompress it and recompress it using +** that knowledge (some compression routines may want to fish around +** on the page, for example, or do something special for leaf nodes.) +*/ +static OffsetNumber +gistPageAddItem(GISTSTATE *giststate, + Relation r, + Page page, + Item item, + Size size, + OffsetNumber offsetNumber, + ItemIdFlags flags, + GISTENTRY *dentry, + IndexTuple *newtup) +{ + GISTENTRY tmpcentry; + IndexTuple itup = (IndexTuple)item; + + /* recompress the item given that we now know the exact page and + offset for insertion */ + gistdentryinit(giststate, dentry, + (((char *) itup) + sizeof(IndexTupleData)), + (Relation)0, (Page)0, (OffsetNumber)InvalidOffsetNumber, + IndexTupleSize(itup) - sizeof(IndexTupleData), FALSE); + gistcentryinit(giststate, &tmpcentry, dentry->pred, r, page, + offsetNumber, dentry->bytes, FALSE); + *newtup = gist_tuple_replacekey(r, *dentry, itup); + /* be tidy */ + if (tmpcentry.pred != dentry->pred + && tmpcentry.pred != (((char *) itup) + sizeof(IndexTupleData))) + pfree(tmpcentry.pred); + + return(PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup), + offsetNumber, flags)); +} + + +static InsertIndexResult +gistdoinsert(Relation r, + IndexTuple itup, /* itup contains compressed entry */ + GISTSTATE *giststate) +{ + char *datum, *newdatum; + GISTENTRY entry, tmpdentry; + InsertIndexResult res; + OffsetNumber l; + GISTSTACK *stack, *tmpstk; + Buffer buffer; + BlockNumber blk; + Page page; + OffsetNumber off; + IndexTuple newtup; + + /* 3rd arg is ignored for now */ + blk = gistChooseSubtree(r, itup, 0, giststate, &stack, &buffer); + page = (Page) BufferGetPage(buffer); + + if (gistnospace(page, itup)) { + /* need to do a split */ + res = gistSplit(r, buffer, stack, itup, giststate); + gistfreestack(stack); + WriteBuffer(buffer); /* don't forget to release buffer! */ + return (res); + } + + if (PageIsEmpty(page)) + off = FirstOffsetNumber; + else + off = OffsetNumberNext(PageGetMaxOffsetNumber(page)); + + /* add the item and write the buffer */ + l = gistPageAddItem(giststate, r, page, (Item) itup, IndexTupleSize(itup), + off, LP_USED, &tmpdentry, &newtup); + WriteBuffer(buffer); + + /* now expand the page boundary in the parent to include the new child */ + gistAdjustKeys(r, stack, blk, tmpdentry.pred, tmpdentry.bytes, giststate); + gistfreestack(stack); + + /* be tidy */ + if (itup != newtup) + pfree(newtup); + if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData))) + pfree(tmpdentry.pred); + + /* build and return an InsertIndexResult for this insertion */ + res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); + ItemPointerSet(&(res->pointerData), blk, l); + + return (res); +} + + +static BlockNumber +gistChooseSubtree(Relation r, IndexTuple itup, /* itup has compressed entry */ + int level, + GISTSTATE *giststate, + GISTSTACK **retstack /*out*/, + Buffer *leafbuf /*out*/) +{ + Buffer buffer; + BlockNumber blk; + GISTSTACK *stack; + Page page; + GISTPageOpaque opaque; + IndexTuple which; + + blk = GISTP_ROOT; + buffer = InvalidBuffer; + stack = (GISTSTACK *) NULL; + + do { + /* let go of current buffer before getting next */ + if (buffer != InvalidBuffer) + ReleaseBuffer(buffer); + + /* get next buffer */ + buffer = ReadBuffer(r, blk); + page = (Page) BufferGetPage(buffer); + + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + if (!(opaque->flags & F_LEAF)) { + GISTSTACK *n; + ItemId iid; + + n = (GISTSTACK *) palloc(sizeof(GISTSTACK)); + n->gs_parent = stack; + n->gs_blk = blk; + n->gs_child = gistchoose(r, page, itup, giststate); + stack = n; + + iid = PageGetItemId(page, n->gs_child); + which = (IndexTuple) PageGetItem(page, iid); + blk = ItemPointerGetBlockNumber(&(which->t_tid)); + } + } while (!(opaque->flags & F_LEAF)); + + *retstack = stack; + *leafbuf = buffer; + + return(blk); +} + + +static void +gistAdjustKeys(Relation r, + GISTSTACK *stk, + BlockNumber blk, + char *datum, /* datum is uncompressed */ + int att_size, + GISTSTATE *giststate) +{ + char *oldud; + Page p; + Buffer b; + bool result; + bytea *evec; + GISTENTRY centry, *ev0p, *ev1p, *dentryp; + int size, datumsize; + IndexTuple tid; + + if (stk == (GISTSTACK *) NULL) + return; + + b = ReadBuffer(r, stk->gs_blk); + p = BufferGetPage(b); + + oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->gs_child)); + tid = (IndexTuple) oldud; + size = IndexTupleSize((IndexTuple)oldud) - sizeof(IndexTupleData); + oldud += sizeof(IndexTupleData); + + evec = (bytea *) palloc(2*sizeof(GISTENTRY) + VARHDRSZ); + VARSIZE(evec) = 2*sizeof(GISTENTRY) + VARHDRSZ; + + /* insert decompressed oldud into entry vector */ + gistdentryinit(giststate, &((GISTENTRY *)VARDATA(evec))[0], + oldud, r, p, stk->gs_child, + size, FALSE); + ev0p = &((GISTENTRY *)VARDATA(evec))[0]; + + /* insert datum entry into entry vector */ + gistentryinit(((GISTENTRY *)VARDATA(evec))[1], datum, + (Relation)NULL,(Page)NULL,(OffsetNumber)0, att_size, FALSE); + ev1p = &((GISTENTRY *)VARDATA(evec))[1]; + + /* form union of decompressed entries */ + datum = (char *) (giststate->unionFn)(evec, &datumsize); + + /* did union leave decompressed version of oldud unchanged? */ + (giststate->equalFn)(ev0p->pred, datum, &result); + if (!result) { + TupleDesc td = RelationGetTupleDescriptor(r); + + /* compress datum for storage on page */ + gistcentryinit(giststate, ¢ry, datum, ev0p->rel, ev0p->page, + ev0p->offset, datumsize, FALSE); + if (td->attrs[0]->attlen >= 0) { + memmove(oldud, centry.pred, att_size); + gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size, + giststate); + } + else if (VARSIZE(centry.pred) == VARSIZE(oldud)) { + memmove(oldud, centry.pred, VARSIZE(centry.pred)); + gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size, + giststate); + } + else { + /* + ** new datum is not the same size as the old. + ** We have to delete the old entry and insert the new + ** one. Note that this may cause a split here! + */ + IndexTuple newtup; + ItemPointerData oldtid; + char *isnull; + TupleDesc tupDesc; + InsertIndexResult res; + + /* delete old tuple */ + ItemPointerSet(&oldtid, stk->gs_blk, stk->gs_child); + gistdelete(r, (ItemPointer)&oldtid); + + /* generate and insert new tuple */ + tupDesc = r->rd_att; + isnull = (char *) palloc(r->rd_rel->relnatts); + memset(isnull, ' ', r->rd_rel->relnatts); + newtup = (IndexTuple) index_formtuple(tupDesc, + (Datum *) ¢ry.pred, isnull); + pfree(isnull); + /* set pointer in new tuple to point to current child */ + ItemPointerSet(&oldtid, blk, 1); + newtup->t_tid = oldtid; + + /* inserting the new entry also adjust keys above */ + res = gistentryinsert(r, stk, newtup, giststate); + + /* in stack, set info to point to new tuple */ + stk->gs_blk = ItemPointerGetBlockNumber(&(res->pointerData)); + stk->gs_child = ItemPointerGetOffsetNumber(&(res->pointerData)); + + pfree(res); + } + WriteBuffer(b); + + if (centry.pred != datum) + pfree(datum); + } + else { + ReleaseBuffer(b); + } + pfree(evec); +} + +/* + * gistSplit -- split a page in the tree. + * + */ +static InsertIndexResult +gistSplit(Relation r, + Buffer buffer, + GISTSTACK *stack, + IndexTuple itup, /* contains compressed entry */ + GISTSTATE *giststate) +{ + Page p; + Buffer leftbuf, rightbuf; + Page left, right; + ItemId itemid; + IndexTuple item; + IndexTuple ltup, rtup, newtup; + OffsetNumber maxoff; + OffsetNumber i; + OffsetNumber leftoff, rightoff; + BlockNumber lbknum, rbknum; + BlockNumber bufblock; + GISTPageOpaque opaque; + int blank; + InsertIndexResult res; + char *isnull; + GIST_SPLITVEC v; + TupleDesc tupDesc; + bytea *entryvec; + bool *decompvec; + IndexTuple item_1; + GISTENTRY tmpdentry, tmpentry; + char *datum; + + isnull = (char *) palloc(r->rd_rel->relnatts); + for (blank = 0; blank < r->rd_rel->relnatts; blank++) + isnull[blank] = ' '; + p = (Page) BufferGetPage(buffer); + opaque = (GISTPageOpaque) PageGetSpecialPointer(p); + + + /* + * The root of the tree is the first block in the relation. If + * we're about to split the root, we need to do some hocus-pocus + * to enforce this guarantee. + */ + + if (BufferGetBlockNumber(buffer) == GISTP_ROOT) { + leftbuf = ReadBuffer(r, P_NEW); + GISTInitBuffer(leftbuf, opaque->flags); + lbknum = BufferGetBlockNumber(leftbuf); + left = (Page) BufferGetPage(leftbuf); + } else { + leftbuf = buffer; + IncrBufferRefCount(buffer); + lbknum = BufferGetBlockNumber(buffer); + left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData)); + } + + rightbuf = ReadBuffer(r, P_NEW); + GISTInitBuffer(rightbuf, opaque->flags); + rbknum = BufferGetBlockNumber(rightbuf); + right = (Page) BufferGetPage(rightbuf); + + /* generate the item array */ + maxoff = PageGetMaxOffsetNumber(p); + entryvec = (bytea *)palloc(VARHDRSZ + (maxoff + 2) * sizeof(GISTENTRY)); + decompvec = (bool *)palloc(VARHDRSZ + (maxoff + 2) * sizeof(bool)); + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { + item_1 = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); + gistdentryinit(giststate, &((GISTENTRY *)VARDATA(entryvec))[i], + (((char *) item_1) + sizeof(IndexTupleData)), + r, p, i, + IndexTupleSize(item_1) - sizeof(IndexTupleData), FALSE); + if ((char *)(((GISTENTRY *)VARDATA(entryvec))[i].pred) + == (((char *) item_1) + sizeof(IndexTupleData))) + decompvec[i] = FALSE; + else decompvec[i] = TRUE; + } + + /* add the new datum as the last entry */ + gistdentryinit(giststate, &(((GISTENTRY *)VARDATA(entryvec))[maxoff+1]), + (((char *) itup) + sizeof(IndexTupleData)), + (Relation)NULL, (Page)NULL, + (OffsetNumber)0, tmpentry.bytes, FALSE); + if ((char *)(((GISTENTRY *)VARDATA(entryvec))[maxoff+1]).pred != + (((char *) itup) + sizeof(IndexTupleData))) + decompvec[maxoff+1] = TRUE; + else decompvec[maxoff+1] = FALSE; + + VARSIZE(entryvec) = (maxoff + 2) * sizeof(GISTENTRY) + VARHDRSZ; + + /* now let the user-defined picksplit function set up the split vector */ + (giststate->picksplitFn)(entryvec, &v); + + /* compress ldatum and rdatum */ + gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation)NULL, + (Page)NULL, (OffsetNumber)0, + ((GISTENTRY *)VARDATA(entryvec))[i].bytes, FALSE); + if (v.spl_ldatum != tmpentry.pred) + pfree(v.spl_ldatum); + v.spl_ldatum = tmpentry.pred; + + gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation)NULL, + (Page)NULL, (OffsetNumber)0, + ((GISTENTRY *)VARDATA(entryvec))[i].bytes, FALSE); + if (v.spl_rdatum != tmpentry.pred) + pfree(v.spl_rdatum); + v.spl_rdatum = tmpentry.pred; + + /* clean up the entry vector: its preds need to be deleted, too */ + for (i = FirstOffsetNumber; i <= maxoff+1; i = OffsetNumberNext(i)) + if (decompvec[i]) + pfree(((GISTENTRY *)VARDATA(entryvec))[i].pred); + pfree(entryvec); + pfree(decompvec); + + leftoff = rightoff = FirstOffsetNumber; + maxoff = PageGetMaxOffsetNumber(p); + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { + itemid = PageGetItemId(p, i); + item = (IndexTuple) PageGetItem(p, itemid); + + if (i == *(v.spl_left)) { + (void) gistPageAddItem(giststate, r, left, (Item) item, + IndexTupleSize(item), + leftoff, LP_USED, &tmpdentry, &newtup); + leftoff = OffsetNumberNext(leftoff); + v.spl_left++; /* advance in left split vector */ + /* be tidy */ + if (tmpdentry.pred != (((char *) item) + sizeof(IndexTupleData))) + pfree(tmpdentry.pred); + if ((IndexTuple)item != newtup) + pfree(newtup); + } + else { + (void) gistPageAddItem(giststate, r, right, (Item) item, + IndexTupleSize(item), + rightoff, LP_USED, &tmpdentry, &newtup); + rightoff = OffsetNumberNext(rightoff); + v.spl_right++; /* advance in right split vector */ + /* be tidy */ + if (tmpdentry.pred != (((char *) item) + sizeof(IndexTupleData))) + pfree(tmpdentry.pred); + if (item != newtup) + pfree(newtup); + } + } + + /* build an InsertIndexResult for this insertion */ + res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); + + /* now insert the new index tuple */ + if (*(v.spl_left) != FirstOffsetNumber) { + (void) gistPageAddItem(giststate, r, left, (Item) itup, + IndexTupleSize(itup), + leftoff, LP_USED, &tmpdentry, &newtup); + leftoff = OffsetNumberNext(leftoff); + ItemPointerSet(&(res->pointerData), lbknum, leftoff); + /* be tidy */ + if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData))) + pfree(tmpdentry.pred); + if (itup != newtup) + pfree(newtup); + } else { + (void) gistPageAddItem(giststate, r, right, (Item) itup, + IndexTupleSize(itup), + rightoff, LP_USED, &tmpdentry, &newtup); + rightoff = OffsetNumberNext(rightoff); + ItemPointerSet(&(res->pointerData), rbknum, rightoff); + /* be tidy */ + if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData))) + pfree(tmpdentry.pred); + if (itup != newtup) + pfree(newtup); + } + + if ((bufblock = BufferGetBlockNumber(buffer)) != GISTP_ROOT) { + PageRestoreTempPage(left, p); + } + WriteBuffer(leftbuf); + WriteBuffer(rightbuf); + + /* + * Okay, the page is split. We have three things left to do: + * + * 1) Adjust any active scans on this index to cope with changes + * we introduced in its structure by splitting this page. + * + * 2) "Tighten" the bounding box of the pointer to the left + * page in the parent node in the tree, if any. Since we + * moved a bunch of stuff off the left page, we expect it + * to get smaller. This happens in the internal insertion + * routine. + * + * 3) Insert a pointer to the right page in the parent. This + * may cause the parent to split. If it does, we need to + * repeat steps one and two for each split node in the tree. + */ + + /* adjust active scans */ + gistadjscans(r, GISTOP_SPLIT, bufblock, FirstOffsetNumber); + + tupDesc = r->rd_att; + + ltup = (IndexTuple) index_formtuple(tupDesc, + (Datum *) &(v.spl_ldatum), isnull); + rtup = (IndexTuple) index_formtuple(tupDesc, + (Datum *) &(v.spl_rdatum), isnull); + pfree(isnull); + + /* set pointers to new child pages in the internal index tuples */ + ItemPointerSet(&(ltup->t_tid), lbknum, 1); + ItemPointerSet(&(rtup->t_tid), rbknum, 1); + + gistintinsert(r, stack, ltup, rtup, giststate); + + pfree(ltup); + pfree(rtup); + + return (res); +} + +/* +** After a split, we need to overwrite the old entry's key in the parent, +** and install install an entry for the new key into the parent. +*/ +static void +gistintinsert(Relation r, + GISTSTACK *stk, + IndexTuple ltup, /* new version of entry for old page */ + IndexTuple rtup, /* entry for new page */ + GISTSTATE *giststate) +{ + IndexTuple old; + Buffer b; + Page p; + ItemPointerData ltid; + + if (stk == (GISTSTACK *) NULL) { + gistnewroot(giststate, r, ltup, rtup); + return; + } + + /* remove old left pointer, insert the 2 new entries */ + ItemPointerSet(<id, stk->gs_blk, stk->gs_child); + gistdelete(r, (ItemPointer)<id); + gistentryinserttwo(r, stk, ltup, rtup, giststate); +} + + +/* +** Insert two entries onto one page, handling a split for either one! +*/ +static void +gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup, + IndexTuple rtup, GISTSTATE *giststate) +{ + Buffer b; + Page p; + InsertIndexResult res; + OffsetNumber off; + bytea *evec; + char *datum; + int size; + GISTENTRY tmpentry; + IndexTuple newtup; + + b = ReadBuffer(r, stk->gs_blk); + p = BufferGetPage(b); + + if (gistnospace(p, ltup)) { + res = gistSplit(r, b, stk->gs_parent, ltup, giststate); + WriteBuffer(b); /* don't forget to release buffer! - 01/31/94 */ + pfree(res); + gistdoinsert(r, rtup, giststate); + } else { + (void) gistPageAddItem(giststate, r, p, (Item)ltup, + IndexTupleSize(ltup), InvalidOffsetNumber, + LP_USED, &tmpentry, &newtup); + WriteBuffer(b); + gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred, + tmpentry.bytes, giststate); + /* be tidy */ + if (tmpentry.pred != (((char *) ltup) + sizeof(IndexTupleData))) + pfree(tmpentry.pred); + if (ltup != newtup) + pfree(newtup); + (void)gistentryinsert(r, stk, rtup, giststate); + } +} + + +/* +** Insert an entry onto a page +*/ +static InsertIndexResult +gistentryinsert(Relation r, GISTSTACK *stk, IndexTuple tup, + GISTSTATE *giststate) +{ + Buffer b; + Page p; + InsertIndexResult res; + bytea *evec; + char *datum; + int size; + OffsetNumber off; + GISTENTRY tmpentry; + IndexTuple newtup; + + b = ReadBuffer(r, stk->gs_blk); + p = BufferGetPage(b); + + if (gistnospace(p, tup)) { + res = gistSplit(r, b, stk->gs_parent, tup, giststate); + WriteBuffer(b); /* don't forget to release buffer! - 01/31/94 */ + return(res); + } + else { + res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); + off = gistPageAddItem(giststate, r, p, (Item) tup, IndexTupleSize(tup), + InvalidOffsetNumber, LP_USED, &tmpentry, &newtup); + WriteBuffer(b); + ItemPointerSet(&(res->pointerData), stk->gs_blk, off); + gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred, + tmpentry.bytes, giststate); + /* be tidy */ + if (tmpentry.pred != (((char *) tup) + sizeof(IndexTupleData))) + pfree(tmpentry.pred); + if (tup != newtup) + pfree(newtup); + return(res); + } +} + + +static void +gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple lt, IndexTuple rt) +{ + Buffer b; + Page p; + GISTENTRY tmpentry; + IndexTuple newtup; + + b = ReadBuffer(r, GISTP_ROOT); + GISTInitBuffer(b, 0); + p = BufferGetPage(b); + (void) gistPageAddItem(giststate, r, p, (Item) lt, IndexTupleSize(lt), + FirstOffsetNumber, + LP_USED, &tmpentry, &newtup); + /* be tidy */ + if (tmpentry.pred != (((char *) lt) + sizeof(IndexTupleData))) + pfree(tmpentry.pred); + if (lt != newtup) + pfree(newtup); + (void) gistPageAddItem(giststate, r, p, (Item) rt, IndexTupleSize(rt), + OffsetNumberNext(FirstOffsetNumber), LP_USED, + &tmpentry, &newtup); + /* be tidy */ + if (tmpentry.pred != (((char *) rt) + sizeof(IndexTupleData))) + pfree(tmpentry.pred); + if (rt != newtup) + pfree(newtup); + WriteBuffer(b); +} + +static void +GISTInitBuffer(Buffer b, uint32 f) +{ + GISTPageOpaque opaque; + Page page; + Size pageSize; + + pageSize = BufferGetPageSize(b); + + page = BufferGetPage(b); + memset(page, 0, (int) pageSize); + PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); + + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + opaque->flags = f; +} + + +/* +** find entry with lowest penalty +*/ +static OffsetNumber +gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ + GISTSTATE *giststate) +{ + OffsetNumber maxoff; + OffsetNumber i; + char *ud, *id; + char *datum; + float usize, dsize; + OffsetNumber which; + float which_grow; + GISTENTRY entry, identry; + int size, idsize; + + idsize = IndexTupleSize(it) - sizeof(IndexTupleData); + id = ((char *) it) + sizeof(IndexTupleData); + maxoff = PageGetMaxOffsetNumber(p); + which_grow = -1.0; + which = -1; + + gistdentryinit(giststate,&identry,id,(Relation)NULL,(Page)NULL, + (OffsetNumber)0, idsize, FALSE); + + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { + datum = (char *) PageGetItem(p, PageGetItemId(p, i)); + size = IndexTupleSize(datum) - sizeof(IndexTupleData); + datum += sizeof(IndexTupleData); + gistdentryinit(giststate,&entry,datum,r,p,i,size,FALSE); + (giststate->penaltyFn)(entry, identry, &usize); + if (which_grow < 0 || usize < which_grow) { + which = i; + which_grow = usize; + if (which_grow == 0) + break; + } + if (entry.pred != datum) + pfree(entry.pred); + } + if (identry.pred != id) + pfree(identry.pred); + + return (which); +} + +static int +gistnospace(Page p, IndexTuple it) +{ + return (PageGetFreeSpace(p) < IndexTupleSize(it)); +} + +void +gistfreestack(GISTSTACK *s) +{ + GISTSTACK *p; + + while (s != (GISTSTACK *) NULL) { + p = s->gs_parent; + pfree(s); + s = p; + } +} + + +/* +** remove an entry from a page +*/ +void +gistdelete(Relation r, ItemPointer tid) +{ + BlockNumber blkno; + OffsetNumber offnum; + Buffer buf; + Page page; + + /* must write-lock on delete */ + RelationSetLockForWrite(r); + + blkno = ItemPointerGetBlockNumber(tid); + offnum = ItemPointerGetOffsetNumber(tid); + + /* adjust any scans that will be affected by this deletion */ + gistadjscans(r, GISTOP_DEL, blkno, offnum); + + /* delete the index tuple */ + buf = ReadBuffer(r, blkno); + page = BufferGetPage(buf); + + PageIndexTupleDelete(page, offnum); + + WriteBuffer(buf); + + /* XXX -- two-phase locking, don't release the write lock */ +} + +void +initGISTstate(GISTSTATE *giststate, Relation index) +{ + RegProcedure consistent_proc, union_proc, compress_proc, decompress_proc; + RegProcedure penalty_proc, picksplit_proc, equal_proc; + func_ptr user_fn; + int pronargs; + HeapTuple htup; + IndexTupleForm itupform; + + consistent_proc = index_getprocid(index, 1, GIST_CONSISTENT_PROC); + union_proc = index_getprocid(index, 1, GIST_UNION_PROC); + compress_proc = index_getprocid(index, 1, GIST_COMPRESS_PROC); + decompress_proc = index_getprocid(index, 1, GIST_DECOMPRESS_PROC); + penalty_proc = index_getprocid(index, 1, GIST_PENALTY_PROC); + picksplit_proc = index_getprocid(index, 1, GIST_PICKSPLIT_PROC); + equal_proc = index_getprocid(index, 1, GIST_EQUAL_PROC); + fmgr_info(consistent_proc, &user_fn, &pronargs); + giststate->consistentFn = user_fn; + fmgr_info(union_proc, &user_fn, &pronargs); + giststate->unionFn = user_fn; + fmgr_info(compress_proc, &user_fn, &pronargs); + giststate->compressFn = user_fn; + fmgr_info(decompress_proc, &user_fn, &pronargs); + giststate->decompressFn = user_fn; + fmgr_info(penalty_proc, &user_fn, &pronargs); + giststate->penaltyFn = user_fn; + fmgr_info(picksplit_proc, &user_fn, &pronargs); + giststate->picksplitFn = user_fn; + fmgr_info(equal_proc, &user_fn, &pronargs); + giststate->equalFn = user_fn; + + /* see if key type is different from type of attribute being indexed */ + htup = SearchSysCacheTuple(INDEXRELID, ObjectIdGetDatum(index->rd_id), + 0,0,0); + itupform = (IndexTupleForm)GETSTRUCT(htup); + if (!HeapTupleIsValid(htup)) + elog(WARN, "initGISTstate: index %d not found", index->rd_id); + giststate->haskeytype = itupform->indhaskeytype; + if (giststate->haskeytype) { + /* key type is different -- is it byval? */ + htup = SearchSysCacheTuple(ATTNUM, + ObjectIdGetDatum(itupform->indexrelid), + UInt16GetDatum(FirstOffsetNumber), + 0,0); + if (!HeapTupleIsValid(htup)) { + elog(WARN, "initGISTstate: no attribute tuple %d %d", + itupform->indexrelid, FirstOffsetNumber); + return; + } + giststate->keytypbyval = (((AttributeTupleForm)htup)->attbyval); + } + else + giststate->keytypbyval = FALSE; + return; +} + + +/* +** Given an IndexTuple to be inserted on a page, this routine replaces +** the key with another key, which may involve generating a new IndexTuple +** if the sizes don't match +*/ +static IndexTuple +gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) +{ + char * datum = (((char *) t) + sizeof(IndexTupleData)); + + /* if new entry fits in index tuple, copy it in */ + if (entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData)) { + memcpy(datum, entry.pred, entry.bytes); + /* clear out old size */ + t->t_info &= 0xe000; + /* or in new size */ + t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData)); + + return(t); + } + else { + /* generate a new index tuple for the compressed entry */ + TupleDesc tupDesc = r->rd_att; + IndexTuple newtup; + char *isnull; + int blank; + + isnull = (char *) palloc(r->rd_rel->relnatts); + for (blank = 0; blank < r->rd_rel->relnatts; blank++) + isnull[blank] = ' '; + newtup = (IndexTuple) index_formtuple(tupDesc, + (Datum *)&(entry.pred), + isnull); + newtup->t_tid = t->t_tid; + pfree(isnull); + return(newtup); + } +} + + +/* +** initialize a GiST entry with a decompressed version of pred +*/ +void +gistdentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r, + Page pg, OffsetNumber o, int b, bool l) +{ + GISTENTRY *dep; + gistentryinit(*e, pr, r, pg, o, b, l); + if (giststate->haskeytype) { + dep = (GISTENTRY *)((giststate->decompressFn)(e)); + gistentryinit(*e, dep->pred, dep->rel, dep->page, dep->offset, dep->bytes, + dep->leafkey); + if (dep != e) pfree(dep); + } +} + + +/* +** initialize a GiST entry with a compressed version of pred +*/ +void +gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r, + Page pg, OffsetNumber o, int b, bool l) +{ + GISTENTRY *cep; + gistentryinit(*e, pr, r, pg, o, b, l); + if (giststate->haskeytype) { + cep = (GISTENTRY *)((giststate->compressFn)(e)); + gistentryinit(*e, cep->pred, cep->rel, cep->page, cep->offset, cep->bytes, + cep->leafkey); + if (cep != e) pfree(cep); + } +} + + + +#define GISTDEBUG +#ifdef GISTDEBUG + +extern char *text_range_out(); +extern char *int_range_out(); + +/* +** sloppy debugging support routine, requires recompilation with appropriate +** "out" method for the index keys. Could be fixed to find that info +** in the catalogs... +*/ +void +_gistdump(Relation r) +{ + Buffer buf; + Page page; + OffsetNumber offnum, maxoff; + BlockNumber blkno; + BlockNumber nblocks; + GISTPageOpaque po; + IndexTuple itup; + BlockNumber itblkno; + OffsetNumber itoffno; + char *datum; + char *itkey; + + nblocks = RelationGetNumberOfBlocks(r); + for (blkno = 0; blkno < nblocks; blkno++) { + buf = ReadBuffer(r, blkno); + page = BufferGetPage(buf); + po = (GISTPageOpaque) PageGetSpecialPointer(page); + maxoff = PageGetMaxOffsetNumber(page); + printf("Page %d maxoff %d <%s>\n", blkno, maxoff, + (po->flags & F_LEAF ? "LEAF" : "INTERNAL")); + + if (PageIsEmpty(page)) { + ReleaseBuffer(buf); + continue; + } + + for (offnum = FirstOffsetNumber; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) { + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); + itblkno = ItemPointerGetBlockNumber(&(itup->t_tid)); + itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid)); + datum = ((char *) itup); + datum += sizeof(IndexTupleData); + /* get out function for type of key, and out it! */ + itkey = (char *) int_range_out(datum); + /* itkey = " unable to print"; */ + printf("\t[%d] size %d heap <%d,%d> key:%s\n", + offnum, IndexTupleSize(itup), itblkno, itoffno, itkey); + pfree(itkey); + } + + ReleaseBuffer(buf); + } +} + +#define TRLOWER(tr) (((tr)->bytes)) +#define TRUPPER(tr) (&((tr)->bytes[MAXALIGN(VARSIZE(TRLOWER(tr)))])) +typedef struct txtrange { + /* flag: NINF means that lower is negative infinity; PINF means that + ** upper is positive infinity. 0 means that both are numbers. + */ + int32 vl_len; + int32 flag; + char bytes[2]; +} TXTRANGE; + +typedef struct intrange { + int lower; + int upper; + /* flag: NINF means that lower is negative infinity; PINF means that + ** upper is positive infinity. 0 means that both are numbers. + */ + int flag; +} INTRANGE; + +char *text_range_out(TXTRANGE *r) +{ + char *result; + char *lower, *upper; + + if (r == NULL) + return(NULL); + result = (char *)palloc(16 + VARSIZE(TRLOWER(r)) + VARSIZE(TRUPPER(r)) + - 2*VARHDRSZ); + + lower = (char *)palloc(VARSIZE(TRLOWER(r)) + 1 - VARHDRSZ); + memcpy(lower, VARDATA(TRLOWER(r)), VARSIZE(TRLOWER(r)) - VARHDRSZ); + lower[VARSIZE(TRLOWER(r)) - VARHDRSZ] = '\0'; + upper = (char *)palloc(VARSIZE(TRUPPER(r)) + 1 - VARHDRSZ); + memcpy(upper, VARDATA(TRUPPER(r)), VARSIZE(TRUPPER(r)) - VARHDRSZ); + upper[VARSIZE(TRUPPER(r)) - VARHDRSZ] = '\0'; + + (void) sprintf(result, "[%s,%s): %d", lower, upper, r->flag); + pfree(lower); + pfree(upper); + return(result); +} + +char * +int_range_out(INTRANGE *r) +{ + char *result; + + if (r == NULL) + return(NULL); + result = (char *)palloc(80); + (void) sprintf(result, "[%d,%d): %d",r->lower, r->upper, r->flag); + + return(result); +} + +#endif /* defined GISTDEBUG */ + diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c new file mode 100644 index 0000000000..d60b0a1194 --- /dev/null +++ b/src/backend/access/gist/gistget.c @@ -0,0 +1,374 @@ +/*------------------------------------------------------------------------- + * + * gistget.c-- + * fetch tuples from a GiST scan. + * + * + * + * IDENTIFICATION + * /usr/local/devel/pglite/cvs/src/backend/access/gisr/gistget.c,v 1.9 1995/08/01 20:16:02 jolly Exp + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "storage/bufmgr.h" +#include "storage/bufpage.h" + +#include "utils/elog.h" +#include "utils/palloc.h" +#include "utils/rel.h" + +#include "access/heapam.h" +#include "access/genam.h" +#include "access/iqual.h" +#include "access/gist.h" +#include "access/sdir.h" + +#include "executor/execdebug.h" + +static OffsetNumber gistfindnext(IndexScanDesc s, Page p, OffsetNumber n, + ScanDirection dir); +static RetrieveIndexResult gistscancache(IndexScanDesc s, ScanDirection dir); +static RetrieveIndexResult gistfirst(IndexScanDesc s, ScanDirection dir); +static RetrieveIndexResult gistnext(IndexScanDesc s, ScanDirection dir); +static ItemPointer gistheapptr(Relation r, ItemPointer itemp); + + +RetrieveIndexResult +gistgettuple(IndexScanDesc s, ScanDirection dir) +{ + RetrieveIndexResult res; + + /* if we have it cached in the scan desc, just return the value */ + if ((res = gistscancache(s, dir)) != (RetrieveIndexResult) NULL) + return (res); + + /* not cached, so we'll have to do some work */ + if (ItemPointerIsValid(&(s->currentItemData))) { + res = gistnext(s, dir); + } else { + res = gistfirst(s, dir); + } + return (res); +} + +static RetrieveIndexResult +gistfirst(IndexScanDesc s, ScanDirection dir) +{ + Buffer b; + Page p; + OffsetNumber n; + OffsetNumber maxoff; + RetrieveIndexResult res; + GISTPageOpaque po; + GISTScanOpaque so; + GISTSTACK *stk; + BlockNumber blk; + IndexTuple it; + ItemPointer ip; + + b = ReadBuffer(s->relation, GISTP_ROOT); + p = BufferGetPage(b); + po = (GISTPageOpaque) PageGetSpecialPointer(p); + so = (GISTScanOpaque) s->opaque; + + for (;;) { + maxoff = PageGetMaxOffsetNumber(p); + if (ScanDirectionIsBackward(dir)) + n = gistfindnext(s, p, maxoff, dir); + else + n = gistfindnext(s, p, FirstOffsetNumber, dir); + + while (n < FirstOffsetNumber || n > maxoff) { + + ReleaseBuffer(b); + if (so->s_stack == (GISTSTACK *) NULL) + return ((RetrieveIndexResult) NULL); + + stk = so->s_stack; + b = ReadBuffer(s->relation, stk->gs_blk); + p = BufferGetPage(b); + po = (GISTPageOpaque) PageGetSpecialPointer(p); + maxoff = PageGetMaxOffsetNumber(p); + + if (ScanDirectionIsBackward(dir)) { + n = OffsetNumberPrev(stk->gs_child); + } else { + n = OffsetNumberNext(stk->gs_child); + } + so->s_stack = stk->gs_parent; + pfree(stk); + + n = gistfindnext(s, p, n, dir); + } + if (po->flags & F_LEAF) { + ItemPointerSet(&(s->currentItemData), BufferGetBlockNumber(b), n); + + it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); + ip = (ItemPointer) palloc(sizeof(ItemPointerData)); + memmove((char *) ip, (char *) &(it->t_tid), + sizeof(ItemPointerData)); + ReleaseBuffer(b); + + res = FormRetrieveIndexResult(&(s->currentItemData), ip); + + return (res); + } else { + stk = (GISTSTACK *) palloc(sizeof(GISTSTACK)); + stk->gs_child = n; + stk->gs_blk = BufferGetBlockNumber(b); + stk->gs_parent = so->s_stack; + so->s_stack = stk; + + it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); + blk = ItemPointerGetBlockNumber(&(it->t_tid)); + + ReleaseBuffer(b); + b = ReadBuffer(s->relation, blk); + p = BufferGetPage(b); + po = (GISTPageOpaque) PageGetSpecialPointer(p); + } + } +} + +static RetrieveIndexResult +gistnext(IndexScanDesc s, ScanDirection dir) +{ + Buffer b; + Page p; + OffsetNumber n; + OffsetNumber maxoff; + RetrieveIndexResult res; + GISTPageOpaque po; + GISTScanOpaque so; + GISTSTACK *stk; + BlockNumber blk; + IndexTuple it; + ItemPointer ip; + + blk = ItemPointerGetBlockNumber(&(s->currentItemData)); + n = ItemPointerGetOffsetNumber(&(s->currentItemData)); + + if (ScanDirectionIsForward(dir)) { + n = OffsetNumberNext(n); + } else { + n = OffsetNumberPrev(n); + } + + b = ReadBuffer(s->relation, blk); + p = BufferGetPage(b); + po = (GISTPageOpaque) PageGetSpecialPointer(p); + so = (GISTScanOpaque) s->opaque; + + for (;;) { + maxoff = PageGetMaxOffsetNumber(p); + n = gistfindnext(s, p, n, dir); + + while (n < FirstOffsetNumber || n > maxoff) { + + ReleaseBuffer(b); + if (so->s_stack == (GISTSTACK *) NULL) + return ((RetrieveIndexResult) NULL); + + stk = so->s_stack; + b = ReadBuffer(s->relation, stk->gs_blk); + p = BufferGetPage(b); + maxoff = PageGetMaxOffsetNumber(p); + po = (GISTPageOpaque) PageGetSpecialPointer(p); + + if (ScanDirectionIsBackward(dir)) { + n = OffsetNumberPrev(stk->gs_child); + } else { + n = OffsetNumberNext(stk->gs_child); + } + so->s_stack = stk->gs_parent; + pfree(stk); + + n = gistfindnext(s, p, n, dir); + } + if (po->flags & F_LEAF) { + ItemPointerSet(&(s->currentItemData), BufferGetBlockNumber(b), n); + + it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); + ip = (ItemPointer) palloc(sizeof(ItemPointerData)); + memmove((char *) ip, (char *) &(it->t_tid), + sizeof(ItemPointerData)); + ReleaseBuffer(b); + + res = FormRetrieveIndexResult(&(s->currentItemData), ip); + + return (res); + } else { + stk = (GISTSTACK *) palloc(sizeof(GISTSTACK)); + stk->gs_child = n; + stk->gs_blk = BufferGetBlockNumber(b); + stk->gs_parent = so->s_stack; + so->s_stack = stk; + + it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); + blk = ItemPointerGetBlockNumber(&(it->t_tid)); + + ReleaseBuffer(b); + b = ReadBuffer(s->relation, blk); + p = BufferGetPage(b); + po = (GISTPageOpaque) PageGetSpecialPointer(p); + + if (ScanDirectionIsBackward(dir)) { + n = PageGetMaxOffsetNumber(p); + } else { + n = FirstOffsetNumber; + } + } + } +} + +/* Similar to index_keytest, but decompresses the key in the IndexTuple */ +bool +gistindex_keytest(IndexTuple tuple, + TupleDesc tupdesc, + int scanKeySize, + ScanKey key, + GISTSTATE *giststate, + Relation r, + Page p, + OffsetNumber offset) +{ + bool isNull; + Datum datum; + int test; + GISTENTRY de; + + IncrIndexProcessed(); + + + while (scanKeySize > 0) { + datum = index_getattr(tuple, + 1, + tupdesc, + &isNull); + gistdentryinit(giststate, &de, (char *)datum, r, p, offset, + IndexTupleSize(tuple) - sizeof(IndexTupleData), + FALSE); + + if (isNull) { + /* XXX eventually should check if SK_ISNULL */ + return (false); + } + + if (key[0].sk_flags & SK_COMMUTE) { + test = (int) (*(key[0].sk_func)) + (DatumGetPointer(key[0].sk_argument), + &de, key[0].sk_procedure); + } else { + test = (int) (*(key[0].sk_func)) + (&de, + DatumGetPointer(key[0].sk_argument), + key[0].sk_procedure); + } + + if (!test == !(key[0].sk_flags & SK_NEGATE)) { + return (false); + } + + scanKeySize -= 1; + key++; + } + + return (true); +} + + +static OffsetNumber +gistfindnext(IndexScanDesc s, Page p, OffsetNumber n, ScanDirection dir) +{ + OffsetNumber maxoff; + char *it; + GISTPageOpaque po; + GISTScanOpaque so; + GISTENTRY de; + GISTSTATE *giststate; + + maxoff = PageGetMaxOffsetNumber(p); + po = (GISTPageOpaque) PageGetSpecialPointer(p); + so = (GISTScanOpaque) s->opaque; + giststate = so->giststate; + + /* + * If we modified the index during the scan, we may have a pointer to + * a ghost tuple, before the scan. If this is the case, back up one. + */ + + if (so->s_flags & GS_CURBEFORE) { + so->s_flags &= ~GS_CURBEFORE; + n = OffsetNumberPrev(n); + } + + while (n >= FirstOffsetNumber && n <= maxoff) { + it = (char *) PageGetItem(p, PageGetItemId(p, n)); + if (gistindex_keytest((IndexTuple) it, + RelationGetTupleDescriptor(s->relation), + s->numberOfKeys, s->keyData, giststate, + s->relation, p, n)) + break; + + if (ScanDirectionIsBackward(dir)) { + n = OffsetNumberPrev(n); + } else { + n = OffsetNumberNext(n); + } + } + + return (n); +} + +static RetrieveIndexResult +gistscancache(IndexScanDesc s, ScanDirection dir) +{ + RetrieveIndexResult res; + ItemPointer ip; + + if (!(ScanDirectionIsNoMovement(dir) + && ItemPointerIsValid(&(s->currentItemData)))) { + + return ((RetrieveIndexResult) NULL); + } + + ip = gistheapptr(s->relation, &(s->currentItemData)); + + if (ItemPointerIsValid(ip)) + res = FormRetrieveIndexResult(&(s->currentItemData), ip); + else + res = (RetrieveIndexResult) NULL; + + return (res); +} + +/* + * gistheapptr returns the item pointer to the tuple in the heap relation + * for which itemp is the index relation item pointer. + */ +static ItemPointer +gistheapptr(Relation r, ItemPointer itemp) +{ + Buffer b; + Page p; + IndexTuple it; + ItemPointer ip; + OffsetNumber n; + + ip = (ItemPointer) palloc(sizeof(ItemPointerData)); + if (ItemPointerIsValid(itemp)) { + b = ReadBuffer(r, ItemPointerGetBlockNumber(itemp)); + p = BufferGetPage(b); + n = ItemPointerGetOffsetNumber(itemp); + it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); + memmove((char *) ip, (char *) &(it->t_tid), + sizeof(ItemPointerData)); + ReleaseBuffer(b); + } else { + ItemPointerSetInvalid(ip); + } + + return (ip); +} diff --git a/src/backend/access/gist/gistold.c b/src/backend/access/gist/gistold.c new file mode 100644 index 0000000000..6a43562c1d --- /dev/null +++ b/src/backend/access/gist/gistold.c @@ -0,0 +1,1159 @@ +/*------------------------------------------------------------------------- + * + * gist.c-- + * interface routines for the postgres GiST indexed access method. + * + * + * + * IDENTIFICATION + * /usr/local/devel/pglite/cvs/src/backend/access/rtree/rtree.c,v 1.16 1995/08/01 20:16:03 jolly Exp + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "storage/bufmgr.h" +#include "storage/bufpage.h" + +#include "utils/elog.h" +#include "utils/palloc.h" +#include "utils/rel.h" +#include "utils/excid.h" + +#include "access/heapam.h" +#include "access/genam.h" +#include "access/gist.h" +#include "access/gistscan.h" +#include "access/funcindex.h" +#include "access/tupdesc.h" + +#include "nodes/execnodes.h" +#include "nodes/plannodes.h" + +#include "executor/executor.h" +#include "executor/tuptable.h" + +#include "catalog/index.h" + +/* non-export function prototypes */ +static InsertIndexResult gistdoinsert(Relation r, IndexTuple itup, + GISTSTATE *GISTstate); +static InsertIndexResult gistentryinsert(Relation r, GISTSTACK *stk, + IndexTuple tup, + GISTSTATE *giststate); +static void gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup, + IndexTuple rtup, GISTSTATE *giststate); +static void gistAdjustKeys(Relation r, GISTSTACK *stk, BlockNumber blk, + char *datum, int att_size, GISTSTATE *giststate); +static void gistintinsert(Relation r, GISTSTACK *stk, IndexTuple ltup, + IndexTuple rtup, GISTSTATE *giststate); +static InsertIndexResult gistSplit(Relation r, Buffer buffer, + GISTSTACK *stack, IndexTuple itup, + GISTSTATE *giststate); +static void gistnewroot(Relation r, IndexTuple lt, IndexTuple rt); +static void GISTInitBuffer(Buffer b, uint32 f); +static BlockNumber gistChooseSubtree(Relation r, IndexTuple itup, int level, + GISTSTATE *giststate, + GISTSTACK **retstack, Buffer *leafbuf); +static OffsetNumber gistchoose(Relation r, Page p, IndexTuple it, + GISTSTATE *giststate); +static int gistnospace(Page p, IndexTuple it); +void gistdelete(Relation r, ItemPointer tid); +static IndexTuple gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t); + +void +gistbuild(Relation heap, + Relation index, + int natts, + AttrNumber *attnum, + IndexStrategy istrat, + uint16 pint, + Datum *params, + FuncIndexInfo *finfo, + PredInfo *predInfo) +{ + HeapScanDesc scan; + Buffer buffer; + AttrNumber i; + HeapTuple htup; + IndexTuple itup; + TupleDesc hd, id; + InsertIndexResult res; + Datum *d; + bool *nulls; + int nb, nh, ni; + ExprContext *econtext; + TupleTable tupleTable; + TupleTableSlot *slot; + Oid hrelid, irelid; + Node *pred, *oldPred; + GISTSTATE giststate; + GISTENTRY tmpentry; + bool *decompvec; + + initGISTstate(&giststate, index); + + /* GiSTs only know how to do stupid locking now */ + RelationSetLockForWrite(index); + + pred = predInfo->pred; + oldPred = predInfo->oldPred; + + /* + * We expect to be called exactly once for any index relation. + * If that's not the case, big trouble's what we have. + */ + + if (oldPred == NULL && (nb = RelationGetNumberOfBlocks(index)) != 0) + elog(WARN, "%.16s already contains data", &(index->rd_rel->relname.data[0])); + + /* initialize the root page (if this is a new index) */ + if (oldPred == NULL) { + buffer = ReadBuffer(index, P_NEW); + GISTInitBuffer(buffer, F_LEAF); + WriteBuffer(buffer); + } + + /* init the tuple descriptors and get set for a heap scan */ + hd = RelationGetTupleDescriptor(heap); + id = RelationGetTupleDescriptor(index); + d = (Datum *)palloc(natts * sizeof (*d)); + nulls = (bool *)palloc(natts * sizeof (*nulls)); + + /* + * If this is a predicate (partial) index, we will need to evaluate the + * predicate using ExecQual, which requires the current tuple to be in a + * slot of a TupleTable. In addition, ExecQual must have an ExprContext + * referring to that slot. Here, we initialize dummy TupleTable and + * ExprContext objects for this purpose. --Nels, Feb '92 + */ +#ifndef OMIT_PARTIAL_INDEX + if (pred != NULL || oldPred != NULL) { + tupleTable = ExecCreateTupleTable(1); + slot = ExecAllocTableSlot(tupleTable); + econtext = makeNode(ExprContext); + FillDummyExprContext(econtext, slot, hd, buffer); + } +#endif /* OMIT_PARTIAL_INDEX */ + scan = heap_beginscan(heap, 0, NowTimeQual, 0, (ScanKey) NULL); + htup = heap_getnext(scan, 0, &buffer); + + /* int the tuples as we insert them */ + nh = ni = 0; + + for (; HeapTupleIsValid(htup); htup = heap_getnext(scan, 0, &buffer)) { + + nh++; + + /* + * If oldPred != NULL, this is an EXTEND INDEX command, so skip + * this tuple if it was already in the existing partial index + */ + if (oldPred != NULL) { +#ifndef OMIT_PARTIAL_INDEX + /*SetSlotContents(slot, htup); */ + slot->val = htup; + if (ExecQual((List*)oldPred, econtext) == true) { + ni++; + continue; + } +#endif /* OMIT_PARTIAL_INDEX */ + } + + /* Skip this tuple if it doesn't satisfy the partial-index predicate */ + if (pred != NULL) { +#ifndef OMIT_PARTIAL_INDEX + /*SetSlotContents(slot, htup); */ + slot->val = htup; + if (ExecQual((List*)pred, econtext) == false) + continue; +#endif /* OMIT_PARTIAL_INDEX */ + } + + ni++; + + /* + * For the current heap tuple, extract all the attributes + * we use in this index, and note which are null. + */ + + for (i = 1; i <= natts; i++) { + int attoff; + bool attnull; + + /* + * Offsets are from the start of the tuple, and are + * zero-based; indices are one-based. The next call + * returns i - 1. That's data hiding for you. + */ + + attoff = AttrNumberGetAttrOffset(i); + /* + d[attoff] = HeapTupleGetAttributeValue(htup, buffer, + */ + d[attoff] = GetIndexValue(htup, + hd, + attoff, + attnum, + finfo, + &attnull, + buffer); + nulls[attoff] = (attnull ? 'n' : ' '); + } + + /* immediately compress keys, and generate an index tuple */ + decompvec = (bool *)palloc(sizeof(bool) * natts); + for (i = 0; i < natts; i++) { + gistcentryinit(&giststate, &tmpentry, (char *)d[i], (Relation) NULL, + (Page) NULL, (OffsetNumber) 0, + -1 /* size is currently bogus */, TRUE); + if (d[i] != (Datum)tmpentry.pred && tmpentry.bytes > sizeof(int32)) + decompvec[i] = TRUE; + else decompvec[i] = FALSE; + d[i] = (Datum)tmpentry.pred; + } + + /* form an index tuple and point it at the heap tuple */ + itup = index_formtuple(id, &d[0], nulls); + itup->t_tid = htup->t_ctid; + + /* + * Since we already have the index relation locked, we + * call gistdoinsert directly. Normal access method calls + * dispatch through gistinsert, which locks the relation + * for write. This is the right thing to do if you're + * inserting single tups, but not when you're initializing + * the whole index at once. + */ + + res = gistdoinsert(index, itup, &giststate); + for (i = 0; i < natts; i++) + if (decompvec[i] == TRUE) pfree((char *)d[i]); + pfree(itup); + pfree(res); + } + + /* okay, all heap tuples are indexed */ + heap_endscan(scan); + RelationUnsetLockForWrite(index); + + if (pred != NULL || oldPred != NULL) { +#ifndef OMIT_PARTIAL_INDEX + ExecDestroyTupleTable(tupleTable, true); + pfree(econtext); +#endif /* OMIT_PARTIAL_INDEX */ + } + + /* + * Since we just inted the tuples in the heap, we update its + * stats in pg_relation to guarantee that the planner takes + * advantage of the index we just created. UpdateStats() does a + * CommandinterIncrement(), which flushes changed entries from + * the system relcache. The act of constructing an index changes + * these heap and index tuples in the system catalogs, so they + * need to be flushed. We close them to guarantee that they + * will be. + */ + + hrelid = heap->rd_id; + irelid = index->rd_id; + heap_close(heap); + index_close(index); + + UpdateStats(hrelid, nh, true); + UpdateStats(irelid, ni, false); + + if (oldPred != NULL) { + if (ni == nh) pred = NULL; + UpdateIndexPredicate(irelid, oldPred, pred); + } + + /* be tidy */ + pfree(nulls); + pfree(d); +} + +/* + * gistinsert -- wrapper for GiST tuple insertion. + * + * This is the public interface routine for tuple insertion in GiSTs. + * It doesn't do any work; just locks the relation and passes the buck. + */ +InsertIndexResult +gistinsert(Relation r, Datum *datum, char *nulls, ItemPointer ht_ctid) +{ + InsertIndexResult res; + IndexTuple itup; + GISTSTATE giststate; + GISTENTRY tmpentry; + int i; + bool *decompvec; + + initGISTstate(&giststate, r); + + /* immediately compress keys, and generate an index tuple */ + decompvec = (bool *)palloc(sizeof(bool) * r->rd_att->natts); + for (i = 0; i < r->rd_att->natts; i++) { + gistcentryinit(&giststate, &tmpentry, (char *)datum[i], (Relation)NULL, + (Page) NULL, (OffsetNumber) 0, + -1 /* size is currently bogus */, TRUE); + if (datum[i] != (Datum)tmpentry.pred && tmpentry.bytes > sizeof(int32)) + decompvec[i] = TRUE; + else decompvec[i] = FALSE; + datum[i] = (Datum)tmpentry.pred; + } + itup = index_formtuple(RelationGetTupleDescriptor(r), datum, nulls); + itup->t_tid = *ht_ctid; + + RelationSetLockForWrite(r); + res = gistdoinsert(r, itup, &giststate); + for (i = 0; i < r->rd_att->natts; i++) + if (decompvec[i] == TRUE) pfree((char *)datum[i]); + pfree(itup); + + /* XXX two-phase locking -- don't unlock the relation until EOT */ + return (res); +} + +/* itup contains original uncompressed entry */ +static InsertIndexResult +gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) +{ + char *datum, *newdatum; + GISTENTRY entry, tmpentry; + InsertIndexResult res; + OffsetNumber l; + GISTSTACK *stack, *tmpstk; + Buffer buffer; + BlockNumber blk; + Page page; + + /* 3rd arg is ignored for now */ + blk = gistChooseSubtree(r, itup, 0, giststate, &stack, &buffer); + page = (Page) BufferGetPage(buffer); + + if (gistnospace(page, itup)) { + /* need to do a split */ + res = gistSplit(r, buffer, stack, itup, giststate); + gistfreestack(stack); + WriteBuffer(buffer); /* don't forget to release buffer! */ + return (res); + } + + /* add the item and write the buffer */ + if (PageIsEmpty(page)) { + l = PageAddItem(page, (Item) itup, IndexTupleSize(itup), + FirstOffsetNumber, + LP_USED); + } else { + l = PageAddItem(page, (Item) itup, IndexTupleSize(itup), + OffsetNumberNext(PageGetMaxOffsetNumber(page)), + LP_USED); + } + + WriteBuffer(buffer); + + /* now expand the page boundary in the parent to include the new child */ + datum = (((char *) itup) + sizeof(IndexTupleData)); + gistdentryinit(giststate, &tmpentry, datum, (Relation)0, (Page)0, + (OffsetNumber)0, + IndexTupleSize(itup) - sizeof(IndexTupleData), FALSE); + gistAdjustKeys(r, stack, blk, tmpentry.pred, tmpentry.bytes, giststate); + gistfreestack(stack); + if (tmpentry.pred != datum) + pfree(tmpentry.pred); + + /* build and return an InsertIndexResult for this insertion */ + res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); + ItemPointerSet(&(res->pointerData), blk, l); + + return (res); +} + +/* itup contains compressed entry */ +static BlockNumber +gistChooseSubtree(Relation r, IndexTuple itup, int level, + GISTSTATE *giststate, + GISTSTACK **retstack /*out*/, + Buffer *leafbuf /*out*/) +{ + Buffer buffer; + BlockNumber blk; + GISTSTACK *stack; + Page page; + GISTPageOpaque opaque; + IndexTuple which; + + blk = GISTP_ROOT; + buffer = InvalidBuffer; + stack = (GISTSTACK *) NULL; + + do { + /* let go of current buffer before getting next */ + if (buffer != InvalidBuffer) + ReleaseBuffer(buffer); + + /* get next buffer */ + buffer = ReadBuffer(r, blk); + page = (Page) BufferGetPage(buffer); + + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + if (!(opaque->flags & F_LEAF)) { + GISTSTACK *n; + ItemId iid; + + n = (GISTSTACK *) palloc(sizeof(GISTSTACK)); + n->gs_parent = stack; + n->gs_blk = blk; + n->gs_child = gistchoose(r, page, itup, giststate); + stack = n; + + iid = PageGetItemId(page, n->gs_child); + which = (IndexTuple) PageGetItem(page, iid); + blk = ItemPointerGetBlockNumber(&(which->t_tid)); + } + } while (!(opaque->flags & F_LEAF)); + + *retstack = stack; + *leafbuf = buffer; + + return(blk); +} + +/* datum is uncompressed */ +static void +gistAdjustKeys(Relation r, + GISTSTACK *stk, + BlockNumber blk, + char *datum, + int att_size, + GISTSTATE *giststate) +{ + char *oldud; + Page p; + Buffer b; + bool result; + bytea *evec; + GISTENTRY centry, *ev0p, *ev1p, *dentryp; + int size, datumsize; + IndexTuple tid; + + if (stk == (GISTSTACK *) NULL) + return; + + b = ReadBuffer(r, stk->gs_blk); + p = BufferGetPage(b); + + oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->gs_child)); + tid = (IndexTuple) oldud; + size = IndexTupleSize((IndexTuple)oldud) - sizeof(IndexTupleData); + oldud += sizeof(IndexTupleData); + + evec = (bytea *) palloc(2*sizeof(GISTENTRY) + VARHDRSZ); + VARSIZE(evec) = 2*sizeof(GISTENTRY) + VARHDRSZ; + + /* insert decompressed oldud into entry vector */ + gistdentryinit(giststate, &((GISTENTRY *)VARDATA(evec))[0], + oldud, r, p, stk->gs_child, + size, FALSE); + ev0p = &((GISTENTRY *)VARDATA(evec))[0]; + + /* insert datum entry into entry vector */ + gistentryinit(((GISTENTRY *)VARDATA(evec))[1], datum, + (Relation)NULL,(Page)NULL,(OffsetNumber)0, att_size, FALSE); + ev1p = &((GISTENTRY *)VARDATA(evec))[1]; + + /* form union of decompressed entries */ + datum = (char *) (giststate->unionFn)(evec, &datumsize); + + /* did union leave decompressed version of oldud unchanged? */ + (giststate->equalFn)(ev0p->pred, datum, &result); + if (!result) { + TupleDesc td = RelationGetTupleDescriptor(r); + + /* compress datum for storage on page */ + gistcentryinit(giststate, ¢ry, datum, ev0p->rel, ev0p->page, + ev0p->offset, datumsize, FALSE); + if (td->attrs[0]->attlen >= 0) { + memmove(oldud, centry.pred, att_size); + gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size, + giststate); + } + else if (VARSIZE(centry.pred) == VARSIZE(oldud)) { + memmove(oldud, centry.pred, VARSIZE(centry.pred)); + gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size, + giststate); + } + else { + /* new datum is not the same size as the old. + ** We have to delete the old entry and insert the new + ** one. Note that this may cause a split here! + */ + IndexTuple newtup; + ItemPointerData oldtid; + char *isnull; + TupleDesc tupDesc; + InsertIndexResult res; + + /* delete old tuple */ + ItemPointerSet(&oldtid, stk->gs_blk, stk->gs_child); + gistdelete(r, (ItemPointer)&oldtid); + + /* generate and insert new tuple */ + tupDesc = r->rd_att; + isnull = (char *) palloc(r->rd_rel->relnatts); + memset(isnull, ' ', r->rd_rel->relnatts); + newtup = (IndexTuple) index_formtuple(tupDesc, + (Datum *) ¢ry.pred, isnull); + pfree(isnull); + /* set pointer in new tuple to point to current child */ + ItemPointerSet(&oldtid, blk, 1); + newtup->t_tid = oldtid; + + /* inserting the new entry also adjust keys above */ + res = gistentryinsert(r, stk, newtup, giststate); + + /* in stack, set info to point to new tuple */ + stk->gs_blk = ItemPointerGetBlockNumber(&(res->pointerData)); + stk->gs_child = ItemPointerGetOffsetNumber(&(res->pointerData)); + + pfree(res); + } + WriteBuffer(b); + + if (centry.pred != datum) + pfree(datum); + } + else { + ReleaseBuffer(b); + } + pfree(evec); +} + +/* + * gistSplit -- split a page in the tree. + * + */ +static InsertIndexResult +gistSplit(Relation r, + Buffer buffer, + GISTSTACK *stack, + IndexTuple itup, /* contains compressed entry */ + GISTSTATE *giststate) +{ + Page p; + Buffer leftbuf, rightbuf; + Page left, right; + ItemId itemid; + IndexTuple item; + IndexTuple ltup, rtup; + OffsetNumber maxoff; + OffsetNumber i; + OffsetNumber leftoff, rightoff; + BlockNumber lbknum, rbknum; + BlockNumber bufblock; + GISTPageOpaque opaque; + int blank; + InsertIndexResult res; + char *isnull; + GIST_SPLITVEC v; + TupleDesc tupDesc; + bytea *entryvec; + bool *decompvec; + IndexTuple item_1; + GISTENTRY tmpcentry, *tmpdentryp, tmpentry; + char *datum; + + isnull = (char *) palloc(r->rd_rel->relnatts); + for (blank = 0; blank < r->rd_rel->relnatts; blank++) + isnull[blank] = ' '; + p = (Page) BufferGetPage(buffer); + opaque = (GISTPageOpaque) PageGetSpecialPointer(p); + + + /* + * The root of the tree is the first block in the relation. If + * we're about to split the root, we need to do some hocus-pocus + * to enforce this guarantee. + */ + + if (BufferGetBlockNumber(buffer) == GISTP_ROOT) { + leftbuf = ReadBuffer(r, P_NEW); + GISTInitBuffer(leftbuf, opaque->flags); + lbknum = BufferGetBlockNumber(leftbuf); + left = (Page) BufferGetPage(leftbuf); + } else { + leftbuf = buffer; + IncrBufferRefCount(buffer); + lbknum = BufferGetBlockNumber(buffer); + left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData)); + } + + rightbuf = ReadBuffer(r, P_NEW); + GISTInitBuffer(rightbuf, opaque->flags); + rbknum = BufferGetBlockNumber(rightbuf); + right = (Page) BufferGetPage(rightbuf); + + /* generate the item array */ + maxoff = PageGetMaxOffsetNumber(p); + entryvec = (bytea *)palloc(VARHDRSZ + (maxoff + 2) * sizeof(GISTENTRY)); + decompvec = (bool *)palloc(VARHDRSZ + (maxoff + 2) * sizeof(bool)); + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { + item_1 = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); + gistdentryinit(giststate, &((GISTENTRY *)VARDATA(entryvec))[i], + (((char *) item_1) + sizeof(IndexTupleData)), + r, p, i, + IndexTupleSize(item_1) - sizeof(IndexTupleData), FALSE); + if ((char *)(((GISTENTRY *)VARDATA(entryvec))[i].pred) + == (((char *) item_1) + sizeof(IndexTupleData))) + decompvec[i] = FALSE; + else decompvec[i] = TRUE; + } + + /* add the new datum as the last entry */ + gistdentryinit(giststate, &(((GISTENTRY *)VARDATA(entryvec))[maxoff+1]), + (((char *) itup) + sizeof(IndexTupleData)), + (Relation)NULL, (Page)NULL, + (OffsetNumber)0, tmpentry.bytes, FALSE); + if ((char *)(((GISTENTRY *)VARDATA(entryvec))[maxoff+1]).pred != + (((char *) itup) + sizeof(IndexTupleData))) + decompvec[maxoff+1] = TRUE; + else decompvec[maxoff+1] = FALSE; + + VARSIZE(entryvec) = (maxoff + 2) * sizeof(GISTENTRY) + VARHDRSZ; + + /* now let the user-defined picksplit function set up the split vector */ + (giststate->picksplitFn)(entryvec, &v); + + /* compress ldatum and rdatum */ + gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation)NULL, + (Page)NULL, (OffsetNumber)0, + ((GISTENTRY *)VARDATA(entryvec))[i].bytes, FALSE); + if (v.spl_ldatum != tmpentry.pred) + pfree(v.spl_ldatum); + v.spl_ldatum = tmpentry.pred; + + gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation)NULL, + (Page)NULL, (OffsetNumber)0, + ((GISTENTRY *)VARDATA(entryvec))[i].bytes, FALSE); + if (v.spl_rdatum != tmpentry.pred) + pfree(v.spl_rdatum); + v.spl_rdatum = tmpentry.pred; + + /* clean up the entry vector: its preds need to be deleted, too */ + for (i = FirstOffsetNumber; i <= maxoff+1; i = OffsetNumberNext(i)) + if (decompvec[i]) + pfree(((GISTENTRY *)VARDATA(entryvec))[i].pred); + pfree(entryvec); + pfree(decompvec); + + leftoff = rightoff = FirstOffsetNumber; + maxoff = PageGetMaxOffsetNumber(p); + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { + itemid = PageGetItemId(p, i); + item = (IndexTuple) PageGetItem(p, itemid); + + if (i == *(v.spl_left)) { + (void) PageAddItem(left, (Item) item, IndexTupleSize(item), + leftoff, LP_USED); + leftoff = OffsetNumberNext(leftoff); + v.spl_left++; /* advance in left split vector */ + } else { + (void) PageAddItem(right, (Item) item, IndexTupleSize(item), + rightoff, LP_USED); + rightoff = OffsetNumberNext(rightoff); + v.spl_right++; /* advance in right split vector */ + } + } + + /* build an InsertIndexResult for this insertion */ + res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); + + /* now insert the new index tuple */ + if (*(v.spl_left) != FirstOffsetNumber) { + (void) PageAddItem(left, (Item) itup, IndexTupleSize(itup), + leftoff, LP_USED); + leftoff = OffsetNumberNext(leftoff); + ItemPointerSet(&(res->pointerData), lbknum, leftoff); + } else { + (void) PageAddItem(right, (Item) itup, IndexTupleSize(itup), + rightoff, LP_USED); + rightoff = OffsetNumberNext(rightoff); + ItemPointerSet(&(res->pointerData), rbknum, rightoff); + } + + if ((bufblock = BufferGetBlockNumber(buffer)) != GISTP_ROOT) { + PageRestoreTempPage(left, p); + } + WriteBuffer(leftbuf); + WriteBuffer(rightbuf); + + /* + * Okay, the page is split. We have three things left to do: + * + * 1) Adjust any active scans on this index to cope with changes + * we introduced in its structure by splitting this page. + * + * 2) "Tighten" the bounding box of the pointer to the left + * page in the parent node in the tree, if any. Since we + * moved a bunch of stuff off the left page, we expect it + * to get smaller. This happens in the internal insertion + * routine. + * + * 3) Insert a pointer to the right page in the parent. This + * may cause the parent to split. If it does, we need to + * repeat steps one and two for each split node in the tree. + */ + + /* adjust active scans */ + gistadjscans(r, GISTOP_SPLIT, bufblock, FirstOffsetNumber); + + tupDesc = r->rd_att; + + ltup = (IndexTuple) index_formtuple(tupDesc, + (Datum *) &(v.spl_ldatum), isnull); + rtup = (IndexTuple) index_formtuple(tupDesc, + (Datum *) &(v.spl_rdatum), isnull); + pfree(isnull); + + /* set pointers to new child pages in the internal index tuples */ + ItemPointerSet(&(ltup->t_tid), lbknum, 1); + ItemPointerSet(&(rtup->t_tid), rbknum, 1); + + gistintinsert(r, stack, ltup, rtup, giststate); + + pfree(ltup); + pfree(rtup); + + return (res); +} + +static void +gistintinsert(Relation r, + GISTSTACK *stk, + IndexTuple ltup, + IndexTuple rtup, + GISTSTATE *giststate) +{ + IndexTuple old; + Buffer b; + Page p; + ItemPointerData ltid; + + if (stk == (GISTSTACK *) NULL) { + gistnewroot(r, ltup, rtup); + return; + } + + /* remove old left pointer, insert the 2 new entries */ + ItemPointerSet(<id, stk->gs_blk, stk->gs_child); + gistdelete(r, (ItemPointer)<id); + gistentryinserttwo(r, stk, ltup, rtup, giststate); +} + +static void +gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup, + IndexTuple rtup, GISTSTATE *giststate) +{ + Buffer b; + Page p; + InsertIndexResult res; + OffsetNumber off; + bytea *evec; + char *datum; + int size; + GISTENTRY tmpentry; + + b = ReadBuffer(r, stk->gs_blk); + p = BufferGetPage(b); + + if (gistnospace(p, ltup)) { + res = gistSplit(r, b, stk->gs_parent, ltup, giststate); + WriteBuffer(b); /* don't forget to release buffer! - 01/31/94 */ + pfree(res); + gistdoinsert(r, rtup, giststate); + } else { + (void) PageAddItem(p, (Item)ltup, IndexTupleSize(ltup), + InvalidOffsetNumber, LP_USED); + WriteBuffer(b); + datum = (((char *) ltup) + sizeof(IndexTupleData)); + size = IndexTupleSize(ltup) - sizeof(IndexTupleData); + gistdentryinit(giststate, &tmpentry, datum, (Relation)0, (Page)0, + (OffsetNumber)0, size, 0); + gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred, + tmpentry.bytes, giststate); + if (tmpentry.pred != datum) + pfree(tmpentry.pred); + (void)gistentryinsert(r, stk, rtup, giststate); + } +} + + +static InsertIndexResult +gistentryinsert(Relation r, GISTSTACK *stk, IndexTuple tup, + GISTSTATE *giststate) +{ + Buffer b; + Page p; + InsertIndexResult res; + bytea *evec; + char *datum; + int size; + OffsetNumber off; + GISTENTRY tmpentry; + + b = ReadBuffer(r, stk->gs_blk); + p = BufferGetPage(b); + + if (gistnospace(p, tup)) { + res = gistSplit(r, b, stk->gs_parent, tup, giststate); + WriteBuffer(b); /* don't forget to release buffer! - 01/31/94 */ + return(res); + } + else { + res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); + off = PageAddItem(p, (Item) tup, IndexTupleSize(tup), + InvalidOffsetNumber, LP_USED); + WriteBuffer(b); + ItemPointerSet(&(res->pointerData), stk->gs_blk, off); + datum = (((char *) tup) + sizeof(IndexTupleData)); + size = IndexTupleSize(tup) - sizeof(IndexTupleData); + gistdentryinit(giststate, &tmpentry, datum, (Relation)0, (Page)0, + (OffsetNumber)0, size, 0); + gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred, + tmpentry.bytes, giststate); + if (tmpentry.pred != datum) + pfree(tmpentry.pred); + return(res); + } +} + + +static void +gistnewroot(Relation r, IndexTuple lt, IndexTuple rt) +{ + Buffer b; + Page p; + + b = ReadBuffer(r, GISTP_ROOT); + GISTInitBuffer(b, 0); + p = BufferGetPage(b); + (void) PageAddItem(p, (Item) lt, IndexTupleSize(lt), + FirstOffsetNumber, LP_USED); + (void) PageAddItem(p, (Item) rt, IndexTupleSize(rt), + OffsetNumberNext(FirstOffsetNumber), LP_USED); + WriteBuffer(b); +} + +static void +GISTInitBuffer(Buffer b, uint32 f) +{ + GISTPageOpaque opaque; + Page page; + Size pageSize; + + pageSize = BufferGetPageSize(b); + + page = BufferGetPage(b); + memset(page, 0, (int) pageSize); + PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); + + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + opaque->flags = f; +} + +/* it contains compressed entry */ +static OffsetNumber +gistchoose(Relation r, Page p, IndexTuple it, GISTSTATE *giststate) +{ + OffsetNumber maxoff; + OffsetNumber i; + char *ud, *id; + char *datum; + float usize, dsize; + OffsetNumber which; + float which_grow; + GISTENTRY entry, identry; + int size, idsize; + + idsize = IndexTupleSize(it) - sizeof(IndexTupleData); + id = ((char *) it) + sizeof(IndexTupleData); + maxoff = PageGetMaxOffsetNumber(p); + which_grow = -1.0; + which = -1; + + gistdentryinit(giststate,&identry,id,(Relation)NULL,(Page)NULL, + (OffsetNumber)0, idsize, FALSE); + + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { + datum = (char *) PageGetItem(p, PageGetItemId(p, i)); + size = IndexTupleSize(datum) - sizeof(IndexTupleData); + datum += sizeof(IndexTupleData); + gistdentryinit(giststate,&entry,datum,r,p,i,size,FALSE); + (giststate->penaltyFn)(entry, identry, &usize); + if (which_grow < 0 || usize < which_grow) { + which = i; + which_grow = usize; + if (which_grow == 0) + break; + } + if (entry.pred != datum) + pfree(entry.pred); + } + if (identry.pred != id) + pfree(identry.pred); + + return (which); +} + +static int +gistnospace(Page p, IndexTuple it) +{ + return (PageGetFreeSpace(p) < IndexTupleSize(it)); +} + +void +gistfreestack(GISTSTACK *s) +{ + GISTSTACK *p; + + while (s != (GISTSTACK *) NULL) { + p = s->gs_parent; + pfree(s); + s = p; + } +} + +void +gistdelete(Relation r, ItemPointer tid) +{ + BlockNumber blkno; + OffsetNumber offnum; + Buffer buf; + Page page; + + /* must write-lock on delete */ + RelationSetLockForWrite(r); + + blkno = ItemPointerGetBlockNumber(tid); + offnum = ItemPointerGetOffsetNumber(tid); + + /* adjust any scans that will be affected by this deletion */ + gistadjscans(r, GISTOP_DEL, blkno, offnum); + + /* delete the index tuple */ + buf = ReadBuffer(r, blkno); + page = BufferGetPage(buf); + + PageIndexTupleDelete(page, offnum); + + WriteBuffer(buf); + + /* XXX -- two-phase locking, don't release the write lock */ +} + +void +initGISTstate(GISTSTATE *giststate, Relation index) +{ + RegProcedure consistent_proc, union_proc, compress_proc, decompress_proc; + RegProcedure penalty_proc, picksplit_proc, equal_proc; + func_ptr user_fn; + int pronargs; + + consistent_proc = index_getprocid(index, 1, GIST_CONSISTENT_PROC); + union_proc = index_getprocid(index, 1, GIST_UNION_PROC); + compress_proc = index_getprocid(index, 1, GIST_COMPRESS_PROC); + decompress_proc = index_getprocid(index, 1, GIST_DECOMPRESS_PROC); + penalty_proc = index_getprocid(index, 1, GIST_PENALTY_PROC); + picksplit_proc = index_getprocid(index, 1, GIST_PICKSPLIT_PROC); + equal_proc = index_getprocid(index, 1, GIST_EQUAL_PROC); + fmgr_info(consistent_proc, &user_fn, &pronargs); + giststate->consistentFn = user_fn; + fmgr_info(union_proc, &user_fn, &pronargs); + giststate->unionFn = user_fn; + fmgr_info(compress_proc, &user_fn, &pronargs); + giststate->compressFn = user_fn; + fmgr_info(decompress_proc, &user_fn, &pronargs); + giststate->decompressFn = user_fn; + fmgr_info(penalty_proc, &user_fn, &pronargs); + giststate->penaltyFn = user_fn; + fmgr_info(picksplit_proc, &user_fn, &pronargs); + giststate->picksplitFn = user_fn; + fmgr_info(equal_proc, &user_fn, &pronargs); + giststate->equalFn = user_fn; + return; +} + +static IndexTuple +gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) +{ + char * datum = (((char *) t) + sizeof(IndexTupleData)); + + /* if new entry fits in index tuple, copy it in */ + if (entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData)) { + memcpy(datum, entry.pred, entry.bytes); + /* clear out old size */ + t->t_info &= 0xe000; + /* or in new size */ + t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData)); + + return(t); + } + else { + /* generate a new index tuple for the compressed entry */ + TupleDesc tupDesc = r->rd_att; + IndexTuple newtup; + char *isnull; + int blank; + + isnull = (char *) palloc(r->rd_rel->relnatts); + for (blank = 0; blank < r->rd_rel->relnatts; blank++) + isnull[blank] = ' '; + newtup = (IndexTuple) index_formtuple(tupDesc, + (Datum *)&(entry.pred), + isnull); + newtup->t_tid = t->t_tid; + pfree(isnull); + return(newtup); + } +} + + +void +gistdentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r, + Page pg, OffsetNumber o, int b, bool l) +{ + GISTENTRY *dep; + gistentryinit(*e, pr, r, pg, o, b, l); + dep = (GISTENTRY *)((giststate->decompressFn)(e)); + gistentryinit(*e, dep->pred, dep->rel, dep->page, dep->offset, dep->bytes, + dep->leafkey); + if (dep != e) pfree(dep); +} + +void +gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r, + Page pg, OffsetNumber o, int b, bool l) +{ + GISTENTRY *cep; + gistentryinit(*e, pr, r, pg, o, b, l); + cep = (GISTENTRY *)((giststate->compressFn)(e)); + gistentryinit(*e, cep->pred, cep->rel, cep->page, cep->offset, cep->bytes, + cep->leafkey); + if (cep != e) pfree(cep); +} + + + +#define GISTDEBUG +#ifdef GISTDEBUG + +extern char *text_range_out(); +extern char *int_range_out(); +void +_gistdump(Relation r) +{ + Buffer buf; + Page page; + OffsetNumber offnum, maxoff; + BlockNumber blkno; + BlockNumber nblocks; + GISTPageOpaque po; + IndexTuple itup; + BlockNumber itblkno; + OffsetNumber itoffno; + char *datum; + char *itkey; + + nblocks = RelationGetNumberOfBlocks(r); + for (blkno = 0; blkno < nblocks; blkno++) { + buf = ReadBuffer(r, blkno); + page = BufferGetPage(buf); + po = (GISTPageOpaque) PageGetSpecialPointer(page); + maxoff = PageGetMaxOffsetNumber(page); + printf("Page %d maxoff %d <%s>\n", blkno, maxoff, + (po->flags & F_LEAF ? "LEAF" : "INTERNAL")); + + if (PageIsEmpty(page)) { + ReleaseBuffer(buf); + continue; + } + + for (offnum = FirstOffsetNumber; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) { + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); + itblkno = ItemPointerGetBlockNumber(&(itup->t_tid)); + itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid)); + datum = ((char *) itup); + datum += sizeof(IndexTupleData); + /* get out function for type of key, and out it! */ + itkey = (char *) int_range_out(datum); + /* itkey = " unable to print"; */ + printf("\t[%d] size %d heap <%d,%d> key:%s\n", + offnum, IndexTupleSize(itup), itblkno, itoffno, itkey); + pfree(itkey); + } + + ReleaseBuffer(buf); + } +} + +#define TRLOWER(tr) (((tr)->bytes)) +#define TRUPPER(tr) (&((tr)->bytes[MAXALIGN(VARSIZE(TRLOWER(tr)))])) +typedef struct txtrange { + /* flag: NINF means that lower is negative infinity; PINF means that + ** upper is positive infinity. 0 means that both are numbers. + */ + int32 vl_len; + int32 flag; + char bytes[2]; +} TXTRANGE; + +typedef struct intrange { + int lower; + int upper; + /* flag: NINF means that lower is negative infinity; PINF means that + ** upper is positive infinity. 0 means that both are numbers. + */ + int flag; +} INTRANGE; + +char *text_range_out(TXTRANGE *r) +{ + char *result; + char *lower, *upper; + + if (r == NULL) + return(NULL); + result = (char *)palloc(16 + VARSIZE(TRLOWER(r)) + VARSIZE(TRUPPER(r)) + - 2*VARHDRSZ); + + lower = (char *)palloc(VARSIZE(TRLOWER(r)) + 1 - VARHDRSZ); + memcpy(lower, VARDATA(TRLOWER(r)), VARSIZE(TRLOWER(r)) - VARHDRSZ); + lower[VARSIZE(TRLOWER(r)) - VARHDRSZ] = '\0'; + upper = (char *)palloc(VARSIZE(TRUPPER(r)) + 1 - VARHDRSZ); + memcpy(upper, VARDATA(TRUPPER(r)), VARSIZE(TRUPPER(r)) - VARHDRSZ); + upper[VARSIZE(TRUPPER(r)) - VARHDRSZ] = '\0'; + + (void) sprintf(result, "[%s,%s): %d", lower, upper, r->flag); + pfree(lower); + pfree(upper); + return(result); +} + +char * +int_range_out(INTRANGE *r) +{ + char *result; + + if (r == NULL) + return(NULL); + result = (char *)palloc(80); + (void) sprintf(result, "[%d,%d): %d",r->lower, r->upper, r->flag); + + return(result); +} + +#endif /* defined GISTDEBUG */ + diff --git a/src/backend/access/gist/gistscan.c b/src/backend/access/gist/gistscan.c new file mode 100644 index 0000000000..f09c52019b --- /dev/null +++ b/src/backend/access/gist/gistscan.c @@ -0,0 +1,382 @@ +/*------------------------------------------------------------------------- + * + * gistscan.c-- + * routines to manage scans on index relations + * + * + * IDENTIFICATION + * /usr/local/devel/pglite/cvs/src/backend/access/gist/gistscan.c,v 1.7 1995/06/14 00:10:05 jolly Exp + * + *------------------------------------------------------------------------- + */ +#include "c.h" +#include "postgres.h" + +#include "storage/bufmgr.h" +#include "storage/bufpage.h" + +#include "utils/elog.h" +#include "utils/palloc.h" +#include "utils/rel.h" + +#include "access/heapam.h" +#include "access/genam.h" +#include "access/gist.h" +#include "access/giststrat.h" + +/* routines defined and used here */ +static void gistregscan(IndexScanDesc s); +static void gistdropscan(IndexScanDesc s); +static void gistadjone(IndexScanDesc s, int op, BlockNumber blkno, + OffsetNumber offnum); +static void adjuststack(GISTSTACK *stk, BlockNumber blkno, + OffsetNumber offnum); +static void adjustiptr(IndexScanDesc s, ItemPointer iptr, + int op, BlockNumber blkno, OffsetNumber offnum); + +/* + * Whenever we start a GiST scan in a backend, we register it in private + * space. Then if the GiST index gets updated, we check all registered + * scans and adjust them if the tuple they point at got moved by the + * update. We only need to do this in private space, because when we update + * an GiST we have a write lock on the tree, so no other process can have + * any locks at all on it. A single transaction can have write and read + * locks on the same object, so that's why we need to handle this case. + */ + +typedef struct GISTScanListData { + IndexScanDesc gsl_scan; + struct GISTScanListData *gsl_next; +} GISTScanListData; + +typedef GISTScanListData *GISTScanList; + +/* pointer to list of local scans on GiSTs */ +static GISTScanList GISTScans = (GISTScanList) NULL; + +IndexScanDesc +gistbeginscan(Relation r, + bool fromEnd, + uint16 nkeys, + ScanKey key) +{ + IndexScanDesc s; + + RelationSetLockForRead(r); + s = RelationGetIndexScan(r, fromEnd, nkeys, key); + gistregscan(s); + + return (s); +} + +void +gistrescan(IndexScanDesc s, bool fromEnd, ScanKey key) +{ + GISTScanOpaque p; + int i; + + if (!IndexScanIsValid(s)) { + elog(WARN, "gistrescan: invalid scan."); + return; + } + + /* + * Clear all the pointers. + */ + + ItemPointerSetInvalid(&s->previousItemData); + ItemPointerSetInvalid(&s->currentItemData); + ItemPointerSetInvalid(&s->nextItemData); + ItemPointerSetInvalid(&s->previousMarkData); + ItemPointerSetInvalid(&s->currentMarkData); + ItemPointerSetInvalid(&s->nextMarkData); + + /* + * Set flags. + */ + if (RelationGetNumberOfBlocks(s->relation) == 0) { + s->flags = ScanUnmarked; + } else if (fromEnd) { + s->flags = ScanUnmarked | ScanUncheckedPrevious; + } else { + s->flags = ScanUnmarked | ScanUncheckedNext; + } + + s->scanFromEnd = fromEnd; + + if (s->numberOfKeys > 0) { + memmove(s->keyData, + key, + s->numberOfKeys * sizeof(ScanKeyData)); + } + + p = (GISTScanOpaque) s->opaque; + if (p != (GISTScanOpaque) NULL) { + freestack(p->s_stack); + freestack(p->s_markstk); + p->s_stack = p->s_markstk = (GISTSTACK *) NULL; + p->s_flags = 0x0; + } else { + /* initialize opaque data */ + p = (GISTScanOpaque) palloc(sizeof(GISTScanOpaqueData)); + p->s_stack = p->s_markstk = (GISTSTACK *) NULL; + p->s_flags = 0x0; + s->opaque = p; + p->giststate = (GISTSTATE *)palloc(sizeof(GISTSTATE)); + initGISTstate(p->giststate, s->relation); + if (s->numberOfKeys > 0) + /* + ** Play games here with the scan key to use the Consistent + ** function for all comparisons: + ** 1) the sk_procedure field will now be used to hold the + ** strategy number + ** 2) the sk_func field will point to the Consistent function + */ + for (i = 0; i < s->numberOfKeys; i++) { + /* s->keyData[i].sk_procedure + = index_getprocid(s->relation, 1, GIST_CONSISTENT_PROC); */ + s->keyData[i].sk_procedure + = RelationGetGISTStrategy(s->relation, s->keyData[i].sk_attno, + s->keyData[i].sk_procedure); + s->keyData[i].sk_func = p->giststate->consistentFn; + } + } +} + +void +gistmarkpos(IndexScanDesc s) +{ + GISTScanOpaque p; + GISTSTACK *o, *n, *tmp; + + s->currentMarkData = s->currentItemData; + p = (GISTScanOpaque) s->opaque; + if (p->s_flags & GS_CURBEFORE) + p->s_flags |= GS_MRKBEFORE; + else + p->s_flags &= ~GS_MRKBEFORE; + + o = (GISTSTACK *) NULL; + n = p->s_stack; + + /* copy the parent stack from the current item data */ + while (n != (GISTSTACK *) NULL) { + tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK)); + tmp->gs_child = n->gs_child; + tmp->gs_blk = n->gs_blk; + tmp->gs_parent = o; + o = tmp; + n = n->gs_parent; + } + + freestack(p->s_markstk); + p->s_markstk = o; +} + +void +gistrestrpos(IndexScanDesc s) +{ + GISTScanOpaque p; + GISTSTACK *o, *n, *tmp; + + s->currentItemData = s->currentMarkData; + p = (GISTScanOpaque) s->opaque; + if (p->s_flags & GS_MRKBEFORE) + p->s_flags |= GS_CURBEFORE; + else + p->s_flags &= ~GS_CURBEFORE; + + o = (GISTSTACK *) NULL; + n = p->s_markstk; + + /* copy the parent stack from the current item data */ + while (n != (GISTSTACK *) NULL) { + tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK)); + tmp->gs_child = n->gs_child; + tmp->gs_blk = n->gs_blk; + tmp->gs_parent = o; + o = tmp; + n = n->gs_parent; + } + + freestack(p->s_stack); + p->s_stack = o; +} + +void +gistendscan(IndexScanDesc s) +{ + GISTScanOpaque p; + + p = (GISTScanOpaque) s->opaque; + + if (p != (GISTScanOpaque) NULL) { + freestack(p->s_stack); + freestack(p->s_markstk); + } + + gistdropscan(s); + /* XXX don't unset read lock -- two-phase locking */ +} + +static void +gistregscan(IndexScanDesc s) +{ + GISTScanList l; + + l = (GISTScanList) palloc(sizeof(GISTScanListData)); + l->gsl_scan = s; + l->gsl_next = GISTScans; + GISTScans = l; +} + +static void +gistdropscan(IndexScanDesc s) +{ + GISTScanList l; + GISTScanList prev; + + prev = (GISTScanList) NULL; + + for (l = GISTScans; + l != (GISTScanList) NULL && l->gsl_scan != s; + l = l->gsl_next) { + prev = l; + } + + if (l == (GISTScanList) NULL) + elog(WARN, "GiST scan list corrupted -- cannot find 0x%lx", s); + + if (prev == (GISTScanList) NULL) + GISTScans = l->gsl_next; + else + prev->gsl_next = l->gsl_next; + + pfree(l); +} + +void +gistadjscans(Relation r, int op, BlockNumber blkno, OffsetNumber offnum) +{ + GISTScanList l; + Oid relid; + + relid = r->rd_id; + for (l = GISTScans; l != (GISTScanList) NULL; l = l->gsl_next) { + if (l->gsl_scan->relation->rd_id == relid) + gistadjone(l->gsl_scan, op, blkno, offnum); + } +} + +/* + * gistadjone() -- adjust one scan for update. + * + * By here, the scan passed in is on a modified relation. Op tells + * us what the modification is, and blkno and offind tell us what + * block and offset index were affected. This routine checks the + * current and marked positions, and the current and marked stacks, + * to see if any stored location needs to be changed because of the + * update. If so, we make the change here. + */ +static void +gistadjone(IndexScanDesc s, + int op, + BlockNumber blkno, + OffsetNumber offnum) +{ + GISTScanOpaque so; + + adjustiptr(s, &(s->currentItemData), op, blkno, offnum); + adjustiptr(s, &(s->currentMarkData), op, blkno, offnum); + + so = (GISTScanOpaque) s->opaque; + + if (op == GISTOP_SPLIT) { + adjuststack(so->s_stack, blkno, offnum); + adjuststack(so->s_markstk, blkno, offnum); + } +} + +/* + * adjustiptr() -- adjust current and marked item pointers in the scan + * + * Depending on the type of update and the place it happened, we + * need to do nothing, to back up one record, or to start over on + * the same page. + */ +static void +adjustiptr(IndexScanDesc s, + ItemPointer iptr, + int op, + BlockNumber blkno, + OffsetNumber offnum) +{ + OffsetNumber curoff; + GISTScanOpaque so; + + if (ItemPointerIsValid(iptr)) { + if (ItemPointerGetBlockNumber(iptr) == blkno) { + curoff = ItemPointerGetOffsetNumber(iptr); + so = (GISTScanOpaque) s->opaque; + + switch (op) { + case GISTOP_DEL: + /* back up one if we need to */ + if (curoff >= offnum) { + + if (curoff > FirstOffsetNumber) { + /* just adjust the item pointer */ + ItemPointerSet(iptr, blkno, OffsetNumberPrev(curoff)); + } else { + /* remember that we're before the current tuple */ + ItemPointerSet(iptr, blkno, FirstOffsetNumber); + if (iptr == &(s->currentItemData)) + so->s_flags |= GS_CURBEFORE; + else + so->s_flags |= GS_MRKBEFORE; + } + } + break; + + case GISTOP_SPLIT: + /* back to start of page on split */ + ItemPointerSet(iptr, blkno, FirstOffsetNumber); + if (iptr == &(s->currentItemData)) + so->s_flags &= ~GS_CURBEFORE; + else + so->s_flags &= ~GS_MRKBEFORE; + break; + + default: + elog(WARN, "Bad operation in GiST scan adjust: %d", op); + } + } + } +} + +/* + * adjuststack() -- adjust the supplied stack for a split on a page in + * the index we're scanning. + * + * If a page on our parent stack has split, we need to back up to the + * beginning of the page and rescan it. The reason for this is that + * the split algorithm for GiSTs doesn't order tuples in any useful + * way on a single page. This means on that a split, we may wind up + * looking at some heap tuples more than once. This is handled in the + * access method update code for heaps; if we've modified the tuple we + * are looking at already in this transaction, we ignore the update + * request. + */ +/*ARGSUSED*/ +static void +adjuststack(GISTSTACK *stk, + BlockNumber blkno, + OffsetNumber offnum) +{ + while (stk != (GISTSTACK *) NULL) { + if (stk->gs_blk == blkno) + stk->gs_child = FirstOffsetNumber; + + stk = stk->gs_parent; + } +} diff --git a/src/backend/access/gist/giststrat.c b/src/backend/access/gist/giststrat.c new file mode 100644 index 0000000000..1dea1946d9 --- /dev/null +++ b/src/backend/access/gist/giststrat.c @@ -0,0 +1,119 @@ +/*------------------------------------------------------------------------- + * + * giststrat.c-- + * strategy map data for GiSTs. + * + * Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * /usr/local/devel/pglite/cvs/src/backend/access/gist/giststrat.c,v 1.4 1995/06/14 00:10:05 jolly Exp + * + *------------------------------------------------------------------------- + */ +#include "c.h" + +#include "utils/rel.h" + +#include "storage/bufmgr.h" +#include "storage/bufpage.h" + +#include "access/istrat.h" +#include "access/gist.h" + +/* + * Note: negate, commute, and negatecommute all assume that operators are + * ordered as follows in the strategy map: + * + * contains, contained-by + * + * The negate, commute, and negatecommute arrays are used by the planner + * to plan indexed scans over data that appears in the qualificiation in + * a boolean negation, or whose operands appear in the wrong order. For + * example, if the operator "<%" means "contains", and the user says + * + * where not rel.box <% "(10,10,20,20)"::box + * + * the planner can plan an index scan by noting that GiST indices have + * an operator in their operator class for negating <%. + * + * Similarly, if the user says something like + * + * where "(10,10,20,20)"::box <% rel.box + * + * the planner can see that the GiST index on rel.box has an operator in + * its opclass for commuting <%, and plan the scan using that operator. + * This added complexity in the access methods makes the planner a lot easier + * to write. + */ + +/* if a op b, what operator tells us if (not a op b)? */ +static StrategyNumber GISTNegate[GISTNStrategies] = { + InvalidStrategy, + InvalidStrategy, + InvalidStrategy + }; + +/* if a op_1 b, what is the operator op_2 such that b op_2 a? */ +static StrategyNumber GISTCommute[GISTNStrategies] = { + InvalidStrategy, + InvalidStrategy, + InvalidStrategy + }; + +/* if a op_1 b, what is the operator op_2 such that (b !op_2 a)? */ +static StrategyNumber GISTNegateCommute[GISTNStrategies] = { + InvalidStrategy, + InvalidStrategy, + InvalidStrategy + }; + +/* + * GiSTs do not currently support TermData (see rtree/rtstrat.c for + * discussion of + * TermData) -- such logic must be encoded in the user's Consistent function. + */ + +/* + * If you were sufficiently attentive to detail, you would go through + * the ExpressionData pain above for every one of the strategies + * we defined. I am not. Now we declare the StrategyEvaluationData + * structure that gets shipped around to help the planner and the access + * method decide what sort of scan it should do, based on (a) what the + * user asked for, (b) what operators are defined for a particular opclass, + * and (c) the reams of information we supplied above. + * + * The idea of all of this initialized data is to make life easier on the + * user when he defines a new operator class to use this access method. + * By filling in all the data, we let him get away with leaving holes in his + * operator class, and still let him use the index. The added complexity + * in the access methods just isn't worth the trouble, though. + */ + +static StrategyEvaluationData GISTEvaluationData = { + GISTNStrategies, /* # of strategies */ + (StrategyTransformMap) GISTNegate, /* how to do (not qual) */ + (StrategyTransformMap) GISTCommute, /* how to swap operands */ + (StrategyTransformMap) GISTNegateCommute, /* how to do both */ + NULL +}; + +StrategyNumber +RelationGetGISTStrategy(Relation r, + AttrNumber attnum, + RegProcedure proc) +{ + return (RelationGetStrategy(r, attnum, &GISTEvaluationData, proc)); +} + +bool +RelationInvokeGISTStrategy(Relation r, + AttrNumber attnum, + StrategyNumber s, + Datum left, + Datum right) +{ + return (RelationInvokeStrategy(r, &GISTEvaluationData, attnum, s, + left, right)); +} + -- 2.40.0