From: Neil Conway Date: Tue, 17 May 2005 00:59:30 +0000 (+0000) Subject: GiST improvements: X-Git-Tag: REL8_1_0BETA1~782 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=eda6dd32d15fe85bfddf6caf32d1a861c4fa5957;p=postgresql GiST improvements: - make sure we always invoke user-supplied GiST methods in a short-lived memory context. This means the backend isn't exposed to any memory leaks that be in those methods (in fact, it is probably a net loss for most GiST methods to bother manually freeing memory now). This also means we can do away with a lot of ugly manual memory management in the GiST code itself. - keep the current page of a GiST index scan pinned, rather than doing a ReadBuffer() for each tuple produced by the scan. Since ReadBuffer() is expensive, this is a perf. win - implement dead tuple killing for GiST indexes (which is easy to do, now that we keep a pin on the current scan page). Now all the builtin indexes implement dead tuple killing. - cleanup a lot of ugly code in GiST --- diff --git a/doc/src/sgml/gist.sgml b/doc/src/sgml/gist.sgml index e5c96d7e54..9577a0768a 100644 --- a/doc/src/sgml/gist.sgml +++ b/doc/src/sgml/gist.sgml @@ -1,5 +1,5 @@ @@ -202,7 +202,7 @@ $PostgreSQL: pgsql/doc/src/sgml/gist.sgml,v 1.17 2005/04/09 03:52:43 momjian Exp The lack of write-ahead logging is just a small matter of programming, but since it isn't done yet, a crash could render a GiST - index inconsistent, forcing a REINDEX. + index inconsistent, forcing a REINDEX. diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 9445d1086f..66d4347fbb 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.115 2005/05/15 04:08:29 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.116 2005/05/17 00:59:30 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -21,6 +21,7 @@ #include "catalog/index.h" #include "commands/vacuum.h" #include "miscadmin.h" +#include "utils/memutils.h" #undef GIST_PAGEADDITEM @@ -45,13 +46,13 @@ * and gistadjsubkey only */ #define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \ - if (isnullkey) { \ - gistentryinit((evp), rkey, r, NULL, \ - (OffsetNumber) 0, rkeyb, FALSE); \ - } else { \ - gistentryinit((evp), okey, r, NULL, \ - (OffsetNumber) 0, okeyb, FALSE); \ - } \ + if (isnullkey) { \ + gistentryinit((evp), rkey, r, NULL, \ + (OffsetNumber) 0, rkeyb, FALSE); \ + } else { \ + gistentryinit((evp), okey, r, NULL, \ + (OffsetNumber) 0, okeyb, FALSE); \ + } \ } while(0) #define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \ @@ -65,6 +66,7 @@ typedef struct GISTSTATE giststate; int numindexattrs; double indtuples; + MemoryContext tmpCxt; } GISTBuildState; @@ -128,9 +130,8 @@ static void gistcentryinit(GISTSTATE *giststate, int nkey, Relation r, Page pg, OffsetNumber o, int b, bool l, bool isNull); static void gistDeCompressAtt(GISTSTATE *giststate, Relation r, - IndexTuple tuple, Page p, OffsetNumber o, - GISTENTRY *attdata, bool *decompvec, bool *isnull); -static void gistFreeAtt(Relation r, GISTENTRY *attdata, bool *decompvec); + IndexTuple tuple, Page p, OffsetNumber o, + GISTENTRY *attdata, bool *isnull); static void gistpenalty(GISTSTATE *giststate, int attno, GISTENTRY *key1, bool isNull1, GISTENTRY *key2, bool isNull2, @@ -143,7 +144,28 @@ static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber c #endif /* - * routine to build an index. Basically calls insert over and over + * Create and return a temporary memory context for use by GiST. We + * _always_ invoke user-provided methods in a temporary memory + * context, so that memory leaks in those functions cannot cause + * problems. Also, we use some additional temporary contexts in the + * GiST code itself, to avoid the need to do some awkward manual + * memory management. + */ +MemoryContext +createTempGistContext(void) +{ + return AllocSetContextCreate(CurrentMemoryContext, + "GiST temporary context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); +} + +/* + * Routine to build an index. Basically calls insert over and over. + * + * XXX: it would be nice to implement some sort of bulk-loading + * algorithm, but it is not clear how to do that. */ Datum gistbuild(PG_FUNCTION_ARGS) @@ -155,10 +177,6 @@ gistbuild(PG_FUNCTION_ARGS) GISTBuildState buildstate; Buffer buffer; - /* no locking is needed */ - - initGISTstate(&buildstate.giststate, index); - /* * We expect to be called exactly once for any index relation. If * that's not the case, big trouble's what we have. @@ -167,6 +185,9 @@ gistbuild(PG_FUNCTION_ARGS) elog(ERROR, "index \"%s\" already contains data", RelationGetRelationName(index)); + /* no locking is needed */ + initGISTstate(&buildstate.giststate, index); + /* initialize the root page */ buffer = ReadBuffer(index, P_NEW); GISTInitBuffer(buffer, F_LEAF); @@ -175,21 +196,27 @@ gistbuild(PG_FUNCTION_ARGS) /* build the index */ buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs; buildstate.indtuples = 0; + /* + * create a temporary memory context that is reset once for each + * tuple inserted into the index + */ + buildstate.tmpCxt = createTempGistContext(); /* do the heap scan */ reltuples = IndexBuildHeapScan(heap, index, indexInfo, - gistbuildCallback, (void *) &buildstate); + gistbuildCallback, (void *) &buildstate); /* okay, all heap tuples are indexed */ + MemoryContextDelete(buildstate.tmpCxt); /* since we just counted the # of tuples, may as well update stats */ IndexCloseAndUpdateStats(heap, reltuples, index, buildstate.indtuples); freeGISTstate(&buildstate.giststate); + #ifdef GISTDEBUG - gist_dumptree(index, 0, GISTP_ROOT, 0); + gist_dumptree(index, 0, GIST_ROOT_BLKNO, 0); #endif - PG_RETURN_VOID(); } @@ -206,32 +233,26 @@ gistbuildCallback(Relation index, { GISTBuildState *buildstate = (GISTBuildState *) state; IndexTuple itup; - bool compvec[INDEX_MAX_KEYS]; GISTENTRY tmpcentry; int i; + MemoryContext oldCxt; /* GiST cannot index tuples with leading NULLs */ if (isnull[0]) return; + oldCxt = MemoryContextSwitchTo(buildstate->tmpCxt); + /* immediately compress keys to normalize */ for (i = 0; i < buildstate->numindexattrs; i++) { if (isnull[i]) - { values[i] = (Datum) 0; - compvec[i] = FALSE; - } else { gistcentryinit(&buildstate->giststate, i, &tmpcentry, values[i], NULL, NULL, (OffsetNumber) 0, - -1 /* size is currently bogus */ , TRUE, FALSE); - if (values[i] != tmpcentry.key && - !(isAttByVal(&buildstate->giststate, i))) - compvec[i] = TRUE; - else - compvec[i] = FALSE; + -1 /* size is currently bogus */, TRUE, FALSE); values[i] = tmpcentry.key; } } @@ -250,12 +271,8 @@ gistbuildCallback(Relation index, gistdoinsert(index, itup, &buildstate->giststate); buildstate->indtuples += 1; - - for (i = 0; i < buildstate->numindexattrs; i++) - if (compvec[i]) - pfree(DatumGetPointer(values[i])); - - pfree(itup); + MemoryContextSwitchTo(oldCxt); + MemoryContextReset(buildstate->tmpCxt); } /* @@ -271,7 +288,6 @@ gistinsert(PG_FUNCTION_ARGS) Datum *values = (Datum *) PG_GETARG_POINTER(1); bool *isnull = (bool *) PG_GETARG_POINTER(2); ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3); - #ifdef NOT_USED Relation heapRel = (Relation) PG_GETARG_POINTER(4); bool checkUnique = PG_GETARG_BOOL(5); @@ -280,7 +296,8 @@ gistinsert(PG_FUNCTION_ARGS) GISTSTATE giststate; GISTENTRY tmpentry; int i; - bool compvec[INDEX_MAX_KEYS]; + MemoryContext oldCxt; + MemoryContext insertCxt; /* * Since GIST is not marked "amconcurrent" in pg_am, caller should @@ -292,25 +309,21 @@ gistinsert(PG_FUNCTION_ARGS) if (isnull[0]) PG_RETURN_BOOL(false); + insertCxt = createTempGistContext(); + oldCxt = MemoryContextSwitchTo(insertCxt); + initGISTstate(&giststate, r); /* immediately compress keys to normalize */ for (i = 0; i < r->rd_att->natts; i++) { if (isnull[i]) - { values[i] = (Datum) 0; - compvec[i] = FALSE; - } else { gistcentryinit(&giststate, i, &tmpentry, values[i], NULL, NULL, (OffsetNumber) 0, - -1 /* size is currently bogus */ , TRUE, FALSE); - if (values[i] != tmpentry.key && !(isAttByVal(&giststate, i))) - compvec[i] = TRUE; - else - compvec[i] = FALSE; + -1 /* size is currently bogus */, TRUE, FALSE); values[i] = tmpentry.key; } } @@ -319,11 +332,10 @@ gistinsert(PG_FUNCTION_ARGS) gistdoinsert(r, itup, &giststate); - for (i = 0; i < r->rd_att->natts; i++) - if (compvec[i] == TRUE) - pfree(DatumGetPointer(values[i])); - pfree(itup); + /* cleanup */ freeGISTstate(&giststate); + MemoryContextSwitchTo(oldCxt); + MemoryContextDelete(insertCxt); PG_RETURN_BOOL(true); } @@ -370,36 +382,29 @@ gistPageAddItem(GISTSTATE *giststate, if (retval == InvalidOffsetNumber) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(r)); - /* be tidy */ - if (DatumGetPointer(tmpcentry.key) != NULL && - tmpcentry.key != dentry->key && - tmpcentry.key != datum) - pfree(DatumGetPointer(tmpcentry.key)); - return (retval); + return retval; } #endif +/* + * Workhouse routine for doing insertion into a GiST index. Note that + * this routine assumes it is invoked in a short-lived memory context, + * so it does not bother releasing palloc'd allocations. + */ static void -gistdoinsert(Relation r, - IndexTuple itup, - GISTSTATE *giststate) +gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) { IndexTuple *instup; - int i, - ret, + int ret, len = 1; instup = (IndexTuple *) palloc(sizeof(IndexTuple)); instup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); memcpy(instup[0], itup, IndexTupleSize(itup)); - ret = gistlayerinsert(r, GISTP_ROOT, &instup, &len, giststate); + ret = gistlayerinsert(r, GIST_ROOT_BLKNO, &instup, &len, giststate); if (ret & SPLITED) gistnewroot(r, instup, len); - - for (i = 0; i < len; i++) - pfree(instup[i]); - pfree(instup); } static int @@ -410,7 +415,6 @@ gistlayerinsert(Relation r, BlockNumber blkno, { Buffer buffer; Page page; - OffsetNumber child; int ret; GISTPageOpaque opaque; @@ -420,12 +424,20 @@ gistlayerinsert(Relation r, BlockNumber blkno, if (!(opaque->flags & F_LEAF)) { - /* internal page, so we must walk on tree */ - /* len IS equal 1 */ + /* + * This is an internal page, so continue to walk down the + * tree. We find the child node that has the minimum insertion + * penalty and recursively invoke ourselves to modify that + * node. Once the recursive call returns, we may need to + * adjust the parent node for two reasons: the child node + * split, or the key in this node needs to be adjusted for the + * newly inserted key below us. + */ ItemId iid; BlockNumber nblkno; ItemPointerData oldtid; IndexTuple oldtup; + OffsetNumber child; child = gistchoose(r, page, *(*itup), giststate); iid = PageGetItemId(page, child); @@ -446,7 +458,7 @@ gistlayerinsert(Relation r, BlockNumber blkno, return 0x00; } - /* child does not splited */ + /* child did not split */ if (!(ret & SPLITED)) { IndexTuple newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate); @@ -458,11 +470,15 @@ gistlayerinsert(Relation r, BlockNumber blkno, return 0x00; } - pfree((*itup)[0]); /* !!! */ (*itup)[0] = newtup; } - /* key is modified, so old version must be deleted */ + /* + * This node's key has been modified, either because a child + * split occurred or because we needed to adjust our key for + * an insert in a child node. Therefore, remove the old + * version of this node's key. + */ ItemPointerSet(&oldtid, blkno, child); gistdelete(r, &oldtid); @@ -491,11 +507,6 @@ gistlayerinsert(Relation r, BlockNumber blkno, oldlen = *len; newitup = gistSplit(r, buffer, itvec, &tlen, giststate); ReleaseBuffer(buffer); - do - pfree((*itup)[oldlen - 1]); - while ((--oldlen) > 0); - pfree((*itup)); - pfree(itvec); *itup = newitup; *len = tlen; /* now tlen >= 2 */ } @@ -509,23 +520,17 @@ gistlayerinsert(Relation r, BlockNumber blkno, FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(page)); - l = gistwritebuffer(r, page, (*itup), *len, off); + l = gistwritebuffer(r, page, *itup, *len, off); WriteBuffer(buffer); if (*len > 1) { /* previous insert ret & SPLITED != 0 */ - int i; - /* * child was splited, so we must form union for insertion in * parent */ IndexTuple newtup = gistunion(r, (*itup), *len, giststate); - ItemPointerSet(&(newtup->t_tid), blkno, 1); - - for (i = 0; i < *len; i++) - pfree((*itup)[i]); (*itup)[0] = newtup; *len = 1; } @@ -544,23 +549,17 @@ gistwritebuffer(Relation r, Page page, IndexTuple *itup, OffsetNumber l = InvalidOffsetNumber; int i; -#ifdef GIST_PAGEADDITEM - GISTENTRY tmpdentry; - IndexTuple newtup; - bool IsNull; -#endif for (i = 0; i < len; i++) { #ifdef GIST_PAGEADDITEM + GISTENTRY tmpdentry; + IndexTuple newtup; + bool IsNull; + l = gistPageAddItem(giststate, r, page, (Item) itup[i], IndexTupleSize(itup[i]), off, LP_USED, &tmpdentry, &newtup); off = OffsetNumberNext(off); - if (DatumGetPointer(tmpdentry.key) != NULL && - tmpdentry.key != index_getattr(itup[i], 1, r->rd_att, &IsNull)) - pfree(DatumGetPointer(tmpdentry.key)); - if (itup[i] != newtup) - pfree(newtup); #else l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]), off, LP_USED); @@ -620,101 +619,77 @@ gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen) } /* - * return union of itup vector + * Return an IndexTuple containing the result of applying the "union" + * method to the specified IndexTuple vector. */ static IndexTuple gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) { Datum attr[INDEX_MAX_KEYS]; - bool whatfree[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; GistEntryVector *evec; - Datum datum; - int datumsize, - i, - j; + int i; GISTENTRY centry[INDEX_MAX_KEYS]; - bool *needfree; - IndexTuple newtup; - bool IsNull; - int reallen; - needfree = (bool *) palloc(((len == 1) ? 2 : len) * sizeof(bool)); evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); - for (j = 0; j < r->rd_att->natts; j++) + for (i = 0; i < r->rd_att->natts; i++) { - reallen = 0; - for (i = 0; i < len; i++) + Datum datum; + int j; + int real_len; + + real_len = 0; + for (j = 0; j < len; j++) { - datum = index_getattr(itvec[i], j + 1, giststate->tupdesc, &IsNull); + bool IsNull; + datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); if (IsNull) continue; - gistdentryinit(giststate, j, - &(evec->vector[reallen]), + gistdentryinit(giststate, i, + &(evec->vector[real_len]), datum, NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), FALSE, IsNull); - if ((!isAttByVal(giststate, j)) && - evec->vector[reallen].key != datum) - needfree[reallen] = TRUE; - else - needfree[reallen] = FALSE; - reallen++; + ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), + FALSE, IsNull); + real_len++; } - if (reallen == 0) + /* If this tuple vector was all NULLs, the union is NULL */ + if (real_len == 0) { - attr[j] = (Datum) 0; - isnull[j] = TRUE; - whatfree[j] = FALSE; + attr[i] = (Datum) 0; + isnull[i] = TRUE; } else { - if (reallen == 1) + int datumsize; + + if (real_len == 1) { evec->n = 2; gistentryinit(evec->vector[1], evec->vector[0].key, r, NULL, - (OffsetNumber) 0, evec->vector[0].bytes, FALSE); - + (OffsetNumber) 0, evec->vector[0].bytes, FALSE); } else - evec->n = reallen; - datum = FunctionCall2(&giststate->unionFn[j], + evec->n = real_len; + + /* Compress the result of the union and store in attr array */ + datum = FunctionCall2(&giststate->unionFn[i], PointerGetDatum(evec), PointerGetDatum(&datumsize)); - for (i = 0; i < reallen; i++) - if (needfree[i]) - pfree(DatumGetPointer(evec->vector[i].key)); - - gistcentryinit(giststate, j, ¢ry[j], datum, + gistcentryinit(giststate, i, ¢ry[i], datum, NULL, NULL, (OffsetNumber) 0, datumsize, FALSE, FALSE); - isnull[j] = FALSE; - attr[j] = centry[j].key; - if (!isAttByVal(giststate, j)) - { - whatfree[j] = TRUE; - if (centry[j].key != datum) - pfree(DatumGetPointer(datum)); - } - else - whatfree[j] = FALSE; + isnull[i] = FALSE; + attr[i] = centry[i].key; } } - pfree(evec); - pfree(needfree); - - newtup = index_form_tuple(giststate->tupdesc, attr, isnull); - for (j = 0; j < r->rd_att->natts; j++) - if (whatfree[j]) - pfree(DatumGetPointer(attr[j])); - - return newtup; + return index_form_tuple(giststate->tupdesc, attr, isnull); } @@ -725,24 +700,18 @@ static IndexTuple gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate) { GistEntryVector *evec; - Datum datum; - int datumsize; - bool result, - neednew = false; - bool isnull[INDEX_MAX_KEYS], - whatfree[INDEX_MAX_KEYS]; + bool neednew = false; + bool isnull[INDEX_MAX_KEYS]; Datum attr[INDEX_MAX_KEYS]; GISTENTRY centry[INDEX_MAX_KEYS], oldatt[INDEX_MAX_KEYS], addatt[INDEX_MAX_KEYS], *ev0p, *ev1p; - bool olddec[INDEX_MAX_KEYS], - adddec[INDEX_MAX_KEYS]; bool oldisnull[INDEX_MAX_KEYS], addisnull[INDEX_MAX_KEYS]; IndexTuple newtup = NULL; - int j; + int i; evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); evec->n = 2; @@ -750,39 +719,40 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis ev1p = &(evec->vector[1]); gistDeCompressAtt(giststate, r, oldtup, NULL, - (OffsetNumber) 0, oldatt, olddec, oldisnull); + (OffsetNumber) 0, oldatt, oldisnull); gistDeCompressAtt(giststate, r, addtup, NULL, - (OffsetNumber) 0, addatt, adddec, addisnull); - + (OffsetNumber) 0, addatt, addisnull); - for (j = 0; j < r->rd_att->natts; j++) + for (i = 0; i < r->rd_att->natts; i++) { - if (oldisnull[j] && addisnull[j]) + if (oldisnull[i] && addisnull[i]) { - attr[j] = (Datum) 0; - isnull[j] = TRUE; - whatfree[j] = FALSE; + attr[i] = (Datum) 0; + isnull[i] = TRUE; } else { - FILLEV( - oldisnull[j], oldatt[j].key, oldatt[j].bytes, - addisnull[j], addatt[j].key, addatt[j].bytes - ); + Datum datum; + int datumsize; - datum = FunctionCall2(&giststate->unionFn[j], + FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes, + addisnull[i], addatt[i].key, addatt[i].bytes); + + datum = FunctionCall2(&giststate->unionFn[i], PointerGetDatum(evec), PointerGetDatum(&datumsize)); - if (oldisnull[j] || addisnull[j]) + if (oldisnull[i] || addisnull[i]) { - if (oldisnull[j]) + if (oldisnull[i]) neednew = true; } else { - FunctionCall3(&giststate->equalFn[j], + bool result; + + FunctionCall3(&giststate->equalFn[i], ev0p->key, datum, PointerGetDatum(&result)); @@ -791,28 +761,14 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis neednew = true; } - if (olddec[j]) - pfree(DatumGetPointer(oldatt[j].key)); - if (adddec[j]) - pfree(DatumGetPointer(addatt[j].key)); - - gistcentryinit(giststate, j, ¢ry[j], datum, + gistcentryinit(giststate, i, ¢ry[i], datum, NULL, NULL, (OffsetNumber) 0, datumsize, FALSE, FALSE); - attr[j] = centry[j].key; - isnull[j] = FALSE; - if ((!isAttByVal(giststate, j))) - { - whatfree[j] = TRUE; - if (centry[j].key != datum) - pfree(DatumGetPointer(datum)); - } - else - whatfree[j] = FALSE; + attr[i] = centry[i].key; + isnull[i] = FALSE; } } - pfree(evec); if (neednew) { @@ -821,33 +777,24 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis newtup->t_tid = oldtup->t_tid; } - for (j = 0; j < r->rd_att->natts; j++) - if (whatfree[j]) - pfree(DatumGetPointer(attr[j])); - return newtup; } static void gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) { - int i, - j, - lr; - Datum *attr; - bool *needfree, - IsNull; - int len, - *attrsize; - OffsetNumber *entries; - GistEntryVector *evec; - Datum datum; - int datumsize; - int reallen; - bool *isnull; + int lr; - for (lr = 0; lr <= 1; lr++) + for (lr = 0; lr < 2; lr++) { + OffsetNumber *entries; + int i; + Datum *attr; + int len, + *attrsize; + bool *isnull; + GistEntryVector *evec; + if (lr) { attrsize = spl->spl_lattrsize; @@ -865,38 +812,41 @@ gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITV isnull = spl->spl_risnull; } - needfree = (bool *) palloc(((len == 1) ? 2 : len) * sizeof(bool)); evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); - for (j = 1; j < r->rd_att->natts; j++) + for (i = 1; i < r->rd_att->natts; i++) { - reallen = 0; - for (i = 0; i < len; i++) + int j; + Datum datum; + int datumsize; + int real_len; + + real_len = 0; + for (j = 0; j < len; j++) { - if (spl->spl_idgrp[entries[i]]) + bool IsNull; + + if (spl->spl_idgrp[entries[j]]) continue; - datum = index_getattr(itvec[entries[i] - 1], j + 1, + datum = index_getattr(itvec[entries[j] - 1], i + 1, giststate->tupdesc, &IsNull); if (IsNull) continue; - gistdentryinit(giststate, j, - &(evec->vector[reallen]), + gistdentryinit(giststate, i, + &(evec->vector[real_len]), datum, NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), FALSE, IsNull); - if ((!isAttByVal(giststate, j)) && - evec->vector[reallen].key != datum) - needfree[reallen] = TRUE; - else - needfree[reallen] = FALSE; - reallen++; + ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), + FALSE, IsNull); + real_len++; } - if (reallen == 0) + + if (real_len == 0) { datum = (Datum) 0; datumsize = 0; - isnull[j] = true; + isnull[i] = true; } else { @@ -904,30 +854,23 @@ gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITV * evec->vector[0].bytes may be not defined, so form union * with itself */ - if (reallen == 1) + if (real_len == 1) { evec->n = 2; - memcpy((void *) &(evec->vector[1]), - (void *) &(evec->vector[0]), + memcpy(&(evec->vector[1]), &(evec->vector[0]), sizeof(GISTENTRY)); } else - evec->n = reallen; - datum = FunctionCall2(&giststate->unionFn[j], + evec->n = real_len; + datum = FunctionCall2(&giststate->unionFn[i], PointerGetDatum(evec), PointerGetDatum(&datumsize)); - isnull[j] = false; + isnull[i] = false; } - for (i = 0; i < reallen; i++) - if (needfree[i]) - pfree(DatumGetPointer(evec->vector[i].key)); - - attr[j] = datum; - attrsize[j] = datumsize; + attr[i] = datum; + attrsize[i] = datumsize; } - pfree(evec); - pfree(needfree); } } @@ -937,11 +880,8 @@ gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITV static int gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) { - int i, - j, - len; + int i; int curid = 1; - bool result; /* * first key is always not null (see gistinsert), so we may not check @@ -949,6 +889,10 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) */ for (i = 0; i < spl->spl_nleft; i++) { + int j; + int len; + bool result; + if (spl->spl_idgrp[spl->spl_left[i]]) continue; len = 0; @@ -996,8 +940,8 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) } /* - * Insert equivalent tuples to left or right page - * with minimize penalty + * Insert equivalent tuples to left or right page with minimum + * penalty */ static void gistadjsubkey(Relation r, @@ -1008,7 +952,6 @@ gistadjsubkey(Relation r, { int curlen; OffsetNumber *curwpos; - bool decfree[INDEX_MAX_KEYS]; GISTENTRY entry, identry[INDEX_MAX_KEYS], *ev0p, @@ -1020,12 +963,12 @@ gistadjsubkey(Relation r, bool isnull[INDEX_MAX_KEYS]; int i, j; - Datum datum; /* clear vectors */ curlen = v->spl_nleft; curwpos = v->spl_left; for (i = 0; i < v->spl_nleft; i++) + { if (v->spl_idgrp[v->spl_left[i]] == 0) { *curwpos = v->spl_left[i]; @@ -1033,11 +976,13 @@ gistadjsubkey(Relation r, } else curlen--; + } v->spl_nleft = curlen; curlen = v->spl_nright; curwpos = v->spl_right; for (i = 0; i < v->spl_nright; i++) + { if (v->spl_idgrp[v->spl_right[i]] == 0) { *curwpos = v->spl_right[i]; @@ -1045,6 +990,7 @@ gistadjsubkey(Relation r, } else curlen--; + } v->spl_nright = curlen; evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); @@ -1055,16 +1001,17 @@ gistadjsubkey(Relation r, /* add equivalent tuple */ for (i = 0; i < *len; i++) { + Datum datum; + if (v->spl_idgrp[i + 1] == 0) /* already inserted */ continue; gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0, - identry, decfree, isnull); + identry, isnull); v->spl_ngrp[v->spl_idgrp[i + 1]]--; if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 && - (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED) + (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED) { - /* force last in group */ rpenalty = 1.0; lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0; @@ -1088,7 +1035,11 @@ gistadjsubkey(Relation r, break; } } - /* add */ + + /* + * add + * XXX: refactor this to avoid duplicating code + */ if (lpenalty < rpenalty) { v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED; @@ -1103,17 +1054,13 @@ gistadjsubkey(Relation r, } else { - FILLEV( - v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j], - isnull[j], identry[j].key, identry[j].bytes - ); + FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j], + isnull[j], identry[j].key, identry[j].bytes); datum = FunctionCall2(&giststate->unionFn[j], PointerGetDatum(evec), PointerGetDatum(&datumsize)); - if ((!isAttByVal(giststate, j)) && !v->spl_lisnull[j]) - pfree(DatumGetPointer(v->spl_lattr[j])); v->spl_lattr[j] = datum; v->spl_lattrsize[j] = datumsize; v->spl_lisnull[j] = false; @@ -1134,28 +1081,20 @@ gistadjsubkey(Relation r, } else { - FILLEV( - v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j], - isnull[j], identry[j].key, identry[j].bytes - ); + FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j], + isnull[j], identry[j].key, identry[j].bytes); datum = FunctionCall2(&giststate->unionFn[j], PointerGetDatum(evec), PointerGetDatum(&datumsize)); - if ((!isAttByVal(giststate, j)) && !v->spl_risnull[j]) - pfree(DatumGetPointer(v->spl_rattr[j])); - v->spl_rattr[j] = datum; v->spl_rattrsize[j] = datumsize; v->spl_risnull[j] = false; } } - } - gistFreeAtt(r, identry, decfree); } - pfree(evec); } /* @@ -1181,13 +1120,8 @@ gistSplit(Relation r, GISTPageOpaque opaque; GIST_SPLITVEC v; GistEntryVector *entryvec; - bool *decompvec; int i, - j, nlen; - int MaxGrpId = 1; - Datum datum; - bool IsNull; p = (Page) BufferGetPage(buffer); opaque = (GISTPageOpaque) PageGetSpecialPointer(p); @@ -1197,8 +1131,7 @@ gistSplit(Relation r, * about to split the root, we need to do some hocus-pocus to enforce * this guarantee. */ - - if (BufferGetBlockNumber(buffer) == GISTP_ROOT) + if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO) { leftbuf = ReadBuffer(r, P_NEW); GISTInitBuffer(leftbuf, opaque->flags); @@ -1221,17 +1154,17 @@ gistSplit(Relation r, /* generate the item array */ entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY)); entryvec->n = *len + 1; - decompvec = (bool *) palloc((*len + 1) * sizeof(bool)); + for (i = 1; i <= *len; i++) { + Datum datum; + bool IsNull; + datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull); gistdentryinit(giststate, 0, &(entryvec->vector[i]), datum, r, p, i, - ATTSIZE(datum, giststate->tupdesc, 1, IsNull), FALSE, IsNull); - if ((!isAttByVal(giststate, 0)) && entryvec->vector[i].key != datum) - decompvec[i] = TRUE; - else - decompvec[i] = FALSE; + ATTSIZE(datum, giststate->tupdesc, 1, IsNull), + FALSE, IsNull); } /* @@ -1259,6 +1192,8 @@ gistSplit(Relation r, */ if (r->rd_att->natts > 1) { + int MaxGrpId; + v.spl_idgrp = (int *) palloc0(sizeof(int) * (*len + 1)); v.spl_grpflag = (char *) palloc0(sizeof(char) * (*len + 1)); v.spl_ngrp = (int *) palloc(sizeof(int) * (*len + 1)); @@ -1274,19 +1209,8 @@ gistSplit(Relation r, */ if (MaxGrpId > 1) gistadjsubkey(r, itup, len, &v, giststate); - - pfree(v.spl_idgrp); - pfree(v.spl_grpflag); - pfree(v.spl_ngrp); } - /* clean up the entry vector: its keys need to be deleted, too */ - for (i = 1; i <= *len; i++) - if (decompvec[i]) - pfree(DatumGetPointer(entryvec->vector[i].key)); - pfree(entryvec); - pfree(decompvec); - /* form left and right vector */ lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nleft); rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nright); @@ -1298,15 +1222,12 @@ gistSplit(Relation r, rvectup[i] = itup[v.spl_right[i] - 1]; - /* write on disk (may be need another split) */ + /* write on disk (may need another split) */ if (gistnospace(right, rvectup, v.spl_nright)) { nlen = v.spl_nright; newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate); ReleaseBuffer(rightbuf); - for (j = 1; j < r->rd_att->natts; j++) - if ((!isAttByVal(giststate, j)) && !v.spl_risnull[j]) - pfree(DatumGetPointer(v.spl_rattr[j])); } else { @@ -1321,7 +1242,6 @@ gistSplit(Relation r, ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1); } - if (gistnospace(left, lvectup, v.spl_nleft)) { int llen = v.spl_nleft; @@ -1330,35 +1250,24 @@ gistSplit(Relation r, lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate); ReleaseBuffer(leftbuf); - for (j = 1; j < r->rd_att->natts; j++) - if ((!isAttByVal(giststate, j)) && !v.spl_lisnull[j]) - pfree(DatumGetPointer(v.spl_lattr[j])); - newtup = gistjoinvector(newtup, &nlen, lntup, llen); - pfree(lntup); } else { OffsetNumber l; l = gistwritebuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber); - if (BufferGetBlockNumber(buffer) != GISTP_ROOT) + if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO) PageRestoreTempPage(left, p); WriteBuffer(leftbuf); nlen += 1; - newtup = (IndexTuple *) repalloc((void *) newtup, sizeof(IndexTuple) * nlen); + newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen); newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull); ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1); } - /* !!! pfree */ - pfree(rvectup); - pfree(lvectup); - pfree(v.spl_left); - pfree(v.spl_right); - *len = nlen; return newtup; } @@ -1369,7 +1278,7 @@ gistnewroot(Relation r, IndexTuple *itup, int len) Buffer b; Page p; - b = ReadBuffer(r, GISTP_ROOT); + b = ReadBuffer(r, GIST_ROOT_BLKNO); GISTInitBuffer(b, 0); p = BufferGetPage(b); @@ -1385,16 +1294,13 @@ GISTInitBuffer(Buffer b, uint32 f) Size pageSize; pageSize = BufferGetPageSize(b); - page = BufferGetPage(b); - PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); opaque = (GISTPageOpaque) PageGetSpecialPointer(page); opaque->flags = f; } - /* * find entry with lowest penalty */ @@ -1404,17 +1310,12 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ { OffsetNumber maxoff; OffsetNumber i; - Datum datum; - float usize; OffsetNumber which; float sum_grow, which_grow[INDEX_MAX_KEYS]; GISTENTRY entry, identry[INDEX_MAX_KEYS]; - bool IsNull, - decompvec[INDEX_MAX_KEYS], - isnull[INDEX_MAX_KEYS]; - int j; + bool isnull[INDEX_MAX_KEYS]; maxoff = PageGetMaxOffsetNumber(p); *which_grow = -1.0; @@ -1422,21 +1323,26 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ sum_grow = 1; gistDeCompressAtt(giststate, r, it, NULL, (OffsetNumber) 0, - identry, decompvec, isnull); + identry, isnull); for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) { + int j; IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); sum_grow = 0; for (j = 0; j < r->rd_att->natts; j++) { - datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); - gistdentryinit(giststate, j, &entry, datum, r, p, i, ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), FALSE, IsNull); - gistpenalty(giststate, j, &entry, IsNull, &identry[j], isnull[j], &usize); + Datum datum; + float usize; + bool IsNull; - if ((!isAttByVal(giststate, j)) && entry.key != datum) - pfree(DatumGetPointer(entry.key)); + datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); + gistdentryinit(giststate, j, &entry, datum, r, p, i, + ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), + FALSE, IsNull); + gistpenalty(giststate, j, &entry, IsNull, + &identry[j], isnull[j], &usize); if (which_grow[j] < 0 || usize < which_grow[j]) { @@ -1456,24 +1362,9 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ } } - gistFreeAtt(r, identry, decompvec); return which; } -void -gistfreestack(GISTSTACK *s) -{ - GISTSTACK *p; - - while (s != NULL) - { - p = s->gs_parent; - pfree(s); - s = p; - } -} - - /* * Retail deletion of a single tuple. * @@ -1593,7 +1484,6 @@ gistbulkdelete(PG_FUNCTION_ARGS) PG_RETURN_POINTER(result); } - void initGISTstate(GISTSTATE *giststate, Relation index) { @@ -1608,22 +1498,22 @@ initGISTstate(GISTSTATE *giststate, Relation index) for (i = 0; i < index->rd_att->natts; i++) { fmgr_info_copy(&(giststate->consistentFn[i]), - index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC), + index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->unionFn[i]), index_getprocinfo(index, i + 1, GIST_UNION_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->compressFn[i]), - index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC), + index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->decompressFn[i]), - index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC), + index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->penaltyFn[i]), index_getprocinfo(index, i + 1, GIST_PENALTY_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->picksplitFn[i]), - index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC), + index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC), CurrentMemoryContext); fmgr_info_copy(&(giststate->equalFn[i]), index_getprocinfo(index, i + 1, GIST_EQUAL_PROC), @@ -1703,11 +1593,8 @@ gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, PointerGetDatum(e))); /* decompressFn may just return the given pointer */ if (dep != e) - { gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, dep->bytes, dep->leafkey); - pfree(dep); - } } else gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); @@ -1732,11 +1619,8 @@ gistcentryinit(GISTSTATE *giststate, int nkey, PointerGetDatum(e))); /* compressFn may just return the given pointer */ if (cep != e) - { gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, cep->bytes, cep->leafkey); - pfree(cep); - } } else gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); @@ -1746,79 +1630,42 @@ static IndexTuple gistFormTuple(GISTSTATE *giststate, Relation r, Datum attdata[], int datumsize[], bool isnull[]) { - IndexTuple tup; - bool whatfree[INDEX_MAX_KEYS]; GISTENTRY centry[INDEX_MAX_KEYS]; Datum compatt[INDEX_MAX_KEYS]; - int j; + int i; - for (j = 0; j < r->rd_att->natts; j++) + for (i = 0; i < r->rd_att->natts; i++) { - if (isnull[j]) - { - compatt[j] = (Datum) 0; - whatfree[j] = FALSE; - } + if (isnull[i]) + compatt[i] = (Datum) 0; else { - gistcentryinit(giststate, j, ¢ry[j], attdata[j], + gistcentryinit(giststate, i, ¢ry[i], attdata[i], NULL, NULL, (OffsetNumber) 0, - datumsize[j], FALSE, FALSE); - compatt[j] = centry[j].key; - if (!isAttByVal(giststate, j)) - { - whatfree[j] = TRUE; - if (centry[j].key != attdata[j]) - pfree(DatumGetPointer(attdata[j])); - } - else - whatfree[j] = FALSE; + datumsize[i], FALSE, FALSE); + compatt[i] = centry[i].key; } } - tup = index_form_tuple(giststate->tupdesc, compatt, isnull); - for (j = 0; j < r->rd_att->natts; j++) - if (whatfree[j]) - pfree(DatumGetPointer(compatt[j])); - - return tup; + return index_form_tuple(giststate->tupdesc, compatt, isnull); } static void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, - OffsetNumber o, GISTENTRY *attdata, bool *decompvec, bool *isnull) + OffsetNumber o, GISTENTRY *attdata, bool *isnull) { int i; - Datum datum; for (i = 0; i < r->rd_att->natts; i++) { - datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); + Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); gistdentryinit(giststate, i, &attdata[i], datum, r, p, o, - ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]), FALSE, isnull[i]); - if (isAttByVal(giststate, i)) - decompvec[i] = FALSE; - else - { - if (attdata[i].key == datum || isnull[i]) - decompvec[i] = FALSE; - else - decompvec[i] = TRUE; - } + ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]), + FALSE, isnull[i]); } } -static void -gistFreeAtt(Relation r, GISTENTRY *attdata, bool *decompvec) -{ - int i; - - for (i = 0; i < r->rd_att->natts; i++) - if (decompvec[i]) - pfree(DatumGetPointer(attdata[i].key)); -} - static void gistpenalty(GISTSTATE *giststate, int attno, GISTENTRY *key1, bool isNull1, diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 8f7a6c7ed4..d2b0f75fc1 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.45 2005/03/27 23:52:55 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.46 2005/05/17 00:59:30 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -16,41 +16,71 @@ #include "access/gist.h" #include "executor/execdebug.h" +#include "utils/memutils.h" - -static OffsetNumber gistfindnext(IndexScanDesc s, Page p, OffsetNumber n, - ScanDirection dir); -static bool gistscancache(IndexScanDesc s, ScanDirection dir); -static bool gistfirst(IndexScanDesc s, ScanDirection dir); -static bool gistnext(IndexScanDesc s, ScanDirection dir); -static bool gistindex_keytest(IndexTuple tuple, - int scanKeySize, ScanKey key, GISTSTATE *giststate, - Relation r, Page p, OffsetNumber offset); +static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n, + ScanDirection dir); +static bool gistnext(IndexScanDesc scan, ScanDirection dir); +static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan, + OffsetNumber offset); +/* + * gistgettuple() -- Get the next tuple in the scan + */ Datum gistgettuple(PG_FUNCTION_ARGS) { - IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0); - ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1); - bool res; + IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); + ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1); + Page page; + OffsetNumber offnum; + GISTScanOpaque so; - /* if we have it cached in the scan desc, just return the value */ - if (gistscancache(s, dir)) - PG_RETURN_BOOL(true); + so = (GISTScanOpaque) scan->opaque; - /* not cached, so we'll have to do some work */ - if (ItemPointerIsValid(&(s->currentItemData))) - res = gistnext(s, dir); - else - res = gistfirst(s, dir); - PG_RETURN_BOOL(res); + /* + * If we have produced an index tuple in the past and the executor + * has informed us we need to mark it as "killed", do so now. + * + * XXX: right now there is no concurrent access. In the + * future, we should (a) get a read lock on the page (b) check + * that the location of the previously-fetched tuple hasn't + * changed due to concurrent insertions. + */ + if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData))) + { + offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData)); + page = BufferGetPage(so->curbuf); + PageGetItemId(page, offnum)->lp_flags |= LP_DELETE; + SetBufferCommitInfoNeedsSave(so->curbuf); + } + + /* + * Get the next tuple that matches the search key. If asked to + * skip killed tuples, continue looping until we find a non-killed + * tuple that matches the search key. + */ + for (;;) + { + bool res = gistnext(scan, dir); + + if (res == true && scan->ignore_killed_tuples) + { + offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData)); + page = BufferGetPage(so->curbuf); + if (ItemIdDeleted(PageGetItemId(page, offnum))) + continue; + } + + PG_RETURN_BOOL(res); + } } Datum gistgetmulti(PG_FUNCTION_ARGS) { - IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0); + IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); ItemPointer tids = (ItemPointer) PG_GETARG_POINTER(1); int32 max_tids = PG_GETARG_INT32(2); int32 *returned_tids = (int32 *) PG_GETARG_POINTER(3); @@ -60,13 +90,10 @@ gistgetmulti(PG_FUNCTION_ARGS) /* XXX generic implementation: loop around guts of gistgettuple */ while (ntids < max_tids) { - if (ItemPointerIsValid(&(s->currentItemData))) - res = gistnext(s, ForwardScanDirection); - else - res = gistfirst(s, ForwardScanDirection); + res = gistnext(scan, ForwardScanDirection); if (!res) break; - tids[ntids] = s->xs_ctup.t_self; + tids[ntids] = scan->xs_ctup.t_self; ntids++; } @@ -74,166 +101,123 @@ gistgetmulti(PG_FUNCTION_ARGS) PG_RETURN_BOOL(res); } +/* + * Fetch a tuple that matchs the search key; this can be invoked + * either to fetch the first such tuple or subsequent matching + * tuples. Returns true iff a matching tuple was found. + */ static bool -gistfirst(IndexScanDesc s, ScanDirection dir) +gistnext(IndexScanDesc scan, ScanDirection dir) { - Buffer b; Page p; OffsetNumber n; - OffsetNumber maxoff; GISTPageOpaque po; GISTScanOpaque so; GISTSTACK *stk; - BlockNumber blk; IndexTuple it; - so = (GISTScanOpaque) s->opaque; + so = (GISTScanOpaque) scan->opaque; + + if (ItemPointerIsValid(&scan->currentItemData) == false) + { + /* Being asked to fetch the first entry, so start at the root */ + Assert(so->curbuf == InvalidBuffer); + so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO); + } - b = ReadBuffer(s->indexRelation, GISTP_ROOT); - p = BufferGetPage(b); + p = BufferGetPage(so->curbuf); po = (GISTPageOpaque) PageGetSpecialPointer(p); - for (;;) + if (ItemPointerIsValid(&scan->currentItemData) == false) { - maxoff = PageGetMaxOffsetNumber(p); if (ScanDirectionIsBackward(dir)) - n = gistfindnext(s, p, maxoff, dir); + n = PageGetMaxOffsetNumber(p); else - n = gistfindnext(s, p, FirstOffsetNumber, dir); - - while (n < FirstOffsetNumber || n > maxoff) - { - stk = so->s_stack; - if (stk == NULL) - { - ReleaseBuffer(b); - return false; - } - - b = ReleaseAndReadBuffer(b, s->indexRelation, stk->gs_blk); - p = BufferGetPage(b); - po = (GISTPageOpaque) PageGetSpecialPointer(p); - maxoff = PageGetMaxOffsetNumber(p); - - if (ScanDirectionIsBackward(dir)) - n = OffsetNumberPrev(stk->gs_child); - else - n = OffsetNumberNext(stk->gs_child); - - so->s_stack = stk->gs_parent; - pfree(stk); - - n = gistfindnext(s, p, n, dir); - } - if (po->flags & F_LEAF) - { - ItemPointerSet(&(s->currentItemData), BufferGetBlockNumber(b), n); - - it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - - s->xs_ctup.t_self = it->t_tid; - - ReleaseBuffer(b); - return true; - } - else - { - stk = (GISTSTACK *) palloc(sizeof(GISTSTACK)); - stk->gs_child = n; - stk->gs_blk = BufferGetBlockNumber(b); - stk->gs_parent = so->s_stack; - so->s_stack = stk; - - it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - blk = ItemPointerGetBlockNumber(&(it->t_tid)); - - b = ReleaseAndReadBuffer(b, s->indexRelation, blk); - p = BufferGetPage(b); - po = (GISTPageOpaque) PageGetSpecialPointer(p); - } + n = FirstOffsetNumber; } -} - -static bool -gistnext(IndexScanDesc s, ScanDirection dir) -{ - Buffer b; - Page p; - OffsetNumber n; - OffsetNumber maxoff; - GISTPageOpaque po; - GISTScanOpaque so; - GISTSTACK *stk; - BlockNumber blk; - IndexTuple it; - - so = (GISTScanOpaque) s->opaque; - - blk = ItemPointerGetBlockNumber(&(s->currentItemData)); - n = ItemPointerGetOffsetNumber(&(s->currentItemData)); - - if (ScanDirectionIsForward(dir)) - n = OffsetNumberNext(n); else - n = OffsetNumberPrev(n); + { + n = ItemPointerGetOffsetNumber(&(scan->currentItemData)); - b = ReadBuffer(s->indexRelation, blk); - p = BufferGetPage(b); - po = (GISTPageOpaque) PageGetSpecialPointer(p); + if (ScanDirectionIsBackward(dir)) + n = OffsetNumberPrev(n); + else + n = OffsetNumberNext(n); + } for (;;) { - maxoff = PageGetMaxOffsetNumber(p); - n = gistfindnext(s, p, n, dir); + n = gistfindnext(scan, n, dir); - while (n < FirstOffsetNumber || n > maxoff) + if (!OffsetNumberIsValid(n)) { - stk = so->s_stack; - if (stk == NULL) + /* + * We ran out of matching index entries on the current + * page, so pop the top stack entry and use it to continue + * the search. + */ + /* If we're out of stack entries, we're done */ + if (so->stack == NULL) { - ReleaseBuffer(b); + ReleaseBuffer(so->curbuf); + so->curbuf = InvalidBuffer; return false; } - b = ReleaseAndReadBuffer(b, s->indexRelation, stk->gs_blk); - p = BufferGetPage(b); + stk = so->stack; + so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation, + stk->block); + p = BufferGetPage(so->curbuf); po = (GISTPageOpaque) PageGetSpecialPointer(p); - maxoff = PageGetMaxOffsetNumber(p); if (ScanDirectionIsBackward(dir)) - n = OffsetNumberPrev(stk->gs_child); + n = OffsetNumberPrev(stk->offset); else - n = OffsetNumberNext(stk->gs_child); + n = OffsetNumberNext(stk->offset); - so->s_stack = stk->gs_parent; + so->stack = stk->parent; pfree(stk); - n = gistfindnext(s, p, n, dir); + continue; } + if (po->flags & F_LEAF) { - ItemPointerSet(&(s->currentItemData), BufferGetBlockNumber(b), n); + /* + * We've found a matching index entry in a leaf page, so + * return success. Note that we keep "curbuf" pinned so + * that we can efficiently resume the index scan later. + */ + ItemPointerSet(&(scan->currentItemData), + BufferGetBlockNumber(so->curbuf), n); it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - - s->xs_ctup.t_self = it->t_tid; - - ReleaseBuffer(b); + scan->xs_ctup.t_self = it->t_tid; return true; } else { + /* + * We've found an entry in an internal node whose key is + * consistent with the search key, so continue the search + * in the pointed-to child node (i.e. we search depth + * first). Push the current node onto the stack so we + * resume searching from this node later. + */ + BlockNumber child_block; + stk = (GISTSTACK *) palloc(sizeof(GISTSTACK)); - stk->gs_child = n; - stk->gs_blk = BufferGetBlockNumber(b); - stk->gs_parent = so->s_stack; - so->s_stack = stk; + stk->offset = n; + stk->block = BufferGetBlockNumber(so->curbuf); + stk->parent = so->stack; + so->stack = stk; it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - blk = ItemPointerGetBlockNumber(&(it->t_tid)); + child_block = ItemPointerGetBlockNumber(&(it->t_tid)); - b = ReleaseAndReadBuffer(b, s->indexRelation, blk); - p = BufferGetPage(b); + so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation, + child_block); + p = BufferGetPage(so->curbuf); po = (GISTPageOpaque) PageGetSpecialPointer(p); if (ScanDirectionIsBackward(dir)) @@ -244,19 +228,34 @@ gistnext(IndexScanDesc s, ScanDirection dir) } } -/* Similar to index_keytest, but decompresses the key in the IndexTuple */ +/* + * Similar to index_keytest, but first decompress the key in the + * IndexTuple before passing it to the sk_func (and we have previously + * overwritten the sk_func to use the user-defined Consistent method, + * so we actually invoke that). Note that this function is always + * invoked in a short-lived memory context, so we don't need to worry + * about cleaning up allocated memory (either here or in the + * implementation of any Consistent methods). + */ static bool gistindex_keytest(IndexTuple tuple, - int scanKeySize, - ScanKey key, - GISTSTATE *giststate, - Relation r, - Page p, + IndexScanDesc scan, OffsetNumber offset) { + int keySize = scan->numberOfKeys; + ScanKey key = scan->keyData; + Relation r = scan->indexRelation; + GISTScanOpaque so; + Page p; + GISTSTATE *giststate; + + so = (GISTScanOpaque) scan->opaque; + giststate = so->giststate; + p = BufferGetPage(so->curbuf); + IncrIndexProcessed(); - while (scanKeySize > 0) + while (keySize > 0) { Datum datum; bool isNull; @@ -297,53 +296,57 @@ gistindex_keytest(IndexTuple tuple, Int32GetDatum(key->sk_strategy), ObjectIdGetDatum(key->sk_subtype)); - /* if index datum had to be decompressed, free it */ - if (de.key != datum && !isAttByVal(giststate, key->sk_attno - 1)) - if (DatumGetPointer(de.key) != NULL) - pfree(DatumGetPointer(de.key)); - if (!DatumGetBool(test)) return false; - scanKeySize--; + keySize--; key++; } return true; } - +/* + * Return the offset of the first index entry that is consistent with + * the search key after offset 'n' in the current page. If there are + * no more consistent entries, return InvalidOffsetNumber. + */ static OffsetNumber -gistfindnext(IndexScanDesc s, Page p, OffsetNumber n, ScanDirection dir) +gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir) { - OffsetNumber maxoff; - IndexTuple it; - GISTPageOpaque po; - GISTScanOpaque so; - GISTSTATE *giststate; - + OffsetNumber maxoff; + IndexTuple it; + GISTPageOpaque po; + GISTScanOpaque so; + MemoryContext oldcxt; + Page p; + + so = (GISTScanOpaque) scan->opaque; + p = BufferGetPage(so->curbuf); maxoff = PageGetMaxOffsetNumber(p); po = (GISTPageOpaque) PageGetSpecialPointer(p); - so = (GISTScanOpaque) s->opaque; - giststate = so->giststate; + + /* + * Make sure we're in a short-lived memory context when we invoke + * a user-supplied GiST method in gistindex_keytest(), so we don't + * leak memory + */ + oldcxt = MemoryContextSwitchTo(so->tempCxt); /* * If we modified the index during the scan, we may have a pointer to * a ghost tuple, before the scan. If this is the case, back up one. */ - - if (so->s_flags & GS_CURBEFORE) + if (so->flags & GS_CURBEFORE) { - so->s_flags &= ~GS_CURBEFORE; + so->flags &= ~GS_CURBEFORE; n = OffsetNumberPrev(n); } while (n >= FirstOffsetNumber && n <= maxoff) { it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - if (gistindex_keytest(it, - s->numberOfKeys, s->keyData, giststate, - s->indexRelation, p, n)) + if (gistindex_keytest(it, scan, n)) break; if (ScanDirectionIsBackward(dir)) @@ -352,28 +355,16 @@ gistfindnext(IndexScanDesc s, Page p, OffsetNumber n, ScanDirection dir) n = OffsetNumberNext(n); } - return n; -} - -static bool -gistscancache(IndexScanDesc s, ScanDirection dir) -{ - Buffer b; - Page p; - OffsetNumber n; - IndexTuple it; + MemoryContextSwitchTo(oldcxt); + MemoryContextReset(so->tempCxt); - if (!(ScanDirectionIsNoMovement(dir) - && ItemPointerIsValid(&(s->currentItemData)))) - return false; - - b = ReadBuffer(s->indexRelation, - ItemPointerGetBlockNumber(&(s->currentItemData))); - p = BufferGetPage(b); - n = ItemPointerGetOffsetNumber(&(s->currentItemData)); - it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - s->xs_ctup.t_self = it->t_tid; - ReleaseBuffer(b); - - return true; + /* + * If we found a matching entry, return its offset; otherwise + * return InvalidOffsetNumber to inform the caller to go to the + * next page. + */ + if (n >= FirstOffsetNumber && n <= maxoff) + return n; + else + return InvalidOffsetNumber; } diff --git a/src/backend/access/gist/gistscan.c b/src/backend/access/gist/gistscan.c index 0746340df4..7b449892f1 100644 --- a/src/backend/access/gist/gistscan.c +++ b/src/backend/access/gist/gistscan.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.56 2004/12/31 21:59:10 pgsql Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.57 2005/05/17 00:59:30 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -17,28 +17,29 @@ #include "access/genam.h" #include "access/gist.h" #include "access/gistscan.h" +#include "utils/memutils.h" #include "utils/resowner.h" - /* routines defined and used here */ -static void gistregscan(IndexScanDesc s); -static void gistdropscan(IndexScanDesc s); -static void gistadjone(IndexScanDesc s, int op, BlockNumber blkno, +static void gistregscan(IndexScanDesc scan); +static void gistdropscan(IndexScanDesc scan); +static void gistadjone(IndexScanDesc scan, int op, BlockNumber blkno, OffsetNumber offnum); static void adjuststack(GISTSTACK *stk, BlockNumber blkno); -static void adjustiptr(IndexScanDesc s, ItemPointer iptr, +static void adjustiptr(IndexScanDesc scan, ItemPointer iptr, int op, BlockNumber blkno, OffsetNumber offnum); +static void gistfreestack(GISTSTACK *s); /* - * Whenever we start a GiST scan in a backend, we register it in private - * space. Then if the GiST index gets updated, we check all registered - * scans and adjust them if the tuple they point at got moved by the - * update. We only need to do this in private space, because when we update - * an GiST we have a write lock on the tree, so no other process can have - * any locks at all on it. A single transaction can have write and read - * locks on the same object, so that's why we need to handle this case. + * Whenever we start a GiST scan in a backend, we register it in + * private space. Then if the GiST index gets updated, we check all + * registered scans and adjust them if the tuple they point at got + * moved by the update. We only need to do this in private space, + * because when we update an GiST we have a write lock on the tree, so + * no other process can have any locks at all on it. A single + * transaction can have write and read locks on the same object, so + * that's why we need to handle this case. */ - typedef struct GISTScanListData { IndexScanDesc gsl_scan; @@ -57,65 +58,77 @@ gistbeginscan(PG_FUNCTION_ARGS) Relation r = (Relation) PG_GETARG_POINTER(0); int nkeys = PG_GETARG_INT32(1); ScanKey key = (ScanKey) PG_GETARG_POINTER(2); - IndexScanDesc s; + IndexScanDesc scan; - s = RelationGetIndexScan(r, nkeys, key); + scan = RelationGetIndexScan(r, nkeys, key); + gistregscan(scan); - gistregscan(s); - - PG_RETURN_POINTER(s); + PG_RETURN_POINTER(scan); } Datum gistrescan(PG_FUNCTION_ARGS) { - IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0); + IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); ScanKey key = (ScanKey) PG_GETARG_POINTER(1); - GISTScanOpaque p; + GISTScanOpaque so; int i; /* * Clear all the pointers. */ - ItemPointerSetInvalid(&s->currentItemData); - ItemPointerSetInvalid(&s->currentMarkData); + ItemPointerSetInvalid(&scan->currentItemData); + ItemPointerSetInvalid(&scan->currentMarkData); - p = (GISTScanOpaque) s->opaque; - if (p != NULL) + so = (GISTScanOpaque) scan->opaque; + if (so != NULL) { /* rescan an existing indexscan --- reset state */ - gistfreestack(p->s_stack); - gistfreestack(p->s_markstk); - p->s_stack = p->s_markstk = NULL; - p->s_flags = 0x0; + gistfreestack(so->stack); + gistfreestack(so->markstk); + so->stack = so->markstk = NULL; + so->flags = 0x0; + /* drop pins on buffers -- no locks held */ + if (BufferIsValid(so->curbuf)) + { + ReleaseBuffer(so->curbuf); + so->curbuf = InvalidBuffer; + } + if (BufferIsValid(so->markbuf)) + { + ReleaseBuffer(so->markbuf); + so->markbuf = InvalidBuffer; + } } else { /* initialize opaque data */ - p = (GISTScanOpaque) palloc(sizeof(GISTScanOpaqueData)); - p->s_stack = p->s_markstk = NULL; - p->s_flags = 0x0; - s->opaque = p; - p->giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE)); - initGISTstate(p->giststate, s->indexRelation); + so = (GISTScanOpaque) palloc(sizeof(GISTScanOpaqueData)); + so->stack = so->markstk = NULL; + so->flags = 0x0; + so->tempCxt = createTempGistContext(); + so->curbuf = so->markbuf = InvalidBuffer; + so->giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE)); + initGISTstate(so->giststate, scan->indexRelation); + + scan->opaque = so; } /* Update scan key, if a new one is given */ - if (key && s->numberOfKeys > 0) + if (key && scan->numberOfKeys > 0) { - memmove(s->keyData, - key, - s->numberOfKeys * sizeof(ScanKeyData)); + memmove(scan->keyData, key, + scan->numberOfKeys * sizeof(ScanKeyData)); /* - * Modify the scan key so that the Consistent function is called - * for all comparisons. The original operator is passed to the - * Consistent function in the form of its strategy number, which - * is available from the sk_strategy field, and its subtype from - * the sk_subtype field. + * Modify the scan key so that all the Consistent method is + * called for all comparisons. The original operator is passed + * to the Consistent function in the form of its strategy + * number, which is available from the sk_strategy field, and + * its subtype from the sk_subtype field. */ - for (i = 0; i < s->numberOfKeys; i++) - s->keyData[i].sk_func = p->giststate->consistentFn[s->keyData[i].sk_attno - 1]; + for (i = 0; i < scan->numberOfKeys; i++) + scan->keyData[i].sk_func = so->giststate->consistentFn[scan->keyData[i].sk_attno - 1]; } PG_RETURN_VOID(); @@ -124,35 +137,47 @@ gistrescan(PG_FUNCTION_ARGS) Datum gistmarkpos(PG_FUNCTION_ARGS) { - IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0); - GISTScanOpaque p; + IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); + GISTScanOpaque so; GISTSTACK *o, *n, *tmp; - s->currentMarkData = s->currentItemData; - p = (GISTScanOpaque) s->opaque; - if (p->s_flags & GS_CURBEFORE) - p->s_flags |= GS_MRKBEFORE; + scan->currentMarkData = scan->currentItemData; + so = (GISTScanOpaque) scan->opaque; + if (so->flags & GS_CURBEFORE) + so->flags |= GS_MRKBEFORE; else - p->s_flags &= ~GS_MRKBEFORE; + so->flags &= ~GS_MRKBEFORE; o = NULL; - n = p->s_stack; + n = so->stack; /* copy the parent stack from the current item data */ while (n != NULL) { tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK)); - tmp->gs_child = n->gs_child; - tmp->gs_blk = n->gs_blk; - tmp->gs_parent = o; + tmp->offset = n->offset; + tmp->block = n->block; + tmp->parent = o; o = tmp; - n = n->gs_parent; + n = n->parent; } - gistfreestack(p->s_markstk); - p->s_markstk = o; + gistfreestack(so->markstk); + so->markstk = o; + + /* Update markbuf: make sure to bump ref count on curbuf */ + if (BufferIsValid(so->markbuf)) + { + ReleaseBuffer(so->markbuf); + so->markbuf = InvalidBuffer; + } + if (BufferIsValid(so->curbuf)) + { + IncrBufferRefCount(so->curbuf); + so->markbuf = so->curbuf; + } PG_RETURN_VOID(); } @@ -160,35 +185,47 @@ gistmarkpos(PG_FUNCTION_ARGS) Datum gistrestrpos(PG_FUNCTION_ARGS) { - IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0); - GISTScanOpaque p; + IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); + GISTScanOpaque so; GISTSTACK *o, *n, *tmp; - s->currentItemData = s->currentMarkData; - p = (GISTScanOpaque) s->opaque; - if (p->s_flags & GS_MRKBEFORE) - p->s_flags |= GS_CURBEFORE; + scan->currentItemData = scan->currentMarkData; + so = (GISTScanOpaque) scan->opaque; + if (so->flags & GS_MRKBEFORE) + so->flags |= GS_CURBEFORE; else - p->s_flags &= ~GS_CURBEFORE; + so->flags &= ~GS_CURBEFORE; o = NULL; - n = p->s_markstk; + n = so->markstk; /* copy the parent stack from the current item data */ while (n != NULL) { tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK)); - tmp->gs_child = n->gs_child; - tmp->gs_blk = n->gs_blk; - tmp->gs_parent = o; + tmp->offset = n->offset; + tmp->block = n->block; + tmp->parent = o; o = tmp; - n = n->gs_parent; + n = n->parent; } - gistfreestack(p->s_stack); - p->s_stack = o; + gistfreestack(so->stack); + so->stack = o; + + /* Update curbuf: be sure to bump ref count on markbuf */ + if (BufferIsValid(so->curbuf)) + { + ReleaseBuffer(so->curbuf); + so->curbuf = InvalidBuffer; + } + if (BufferIsValid(so->markbuf)) + { + IncrBufferRefCount(so->markbuf); + so->curbuf = so->markbuf; + } PG_RETURN_VOID(); } @@ -196,52 +233,57 @@ gistrestrpos(PG_FUNCTION_ARGS) Datum gistendscan(PG_FUNCTION_ARGS) { - IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0); - GISTScanOpaque p; + IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); + GISTScanOpaque so; - p = (GISTScanOpaque) s->opaque; + so = (GISTScanOpaque) scan->opaque; - if (p != NULL) + if (so != NULL) { - gistfreestack(p->s_stack); - gistfreestack(p->s_markstk); - if (p->giststate != NULL) - freeGISTstate(p->giststate); - pfree(s->opaque); + gistfreestack(so->stack); + gistfreestack(so->markstk); + if (so->giststate != NULL) + freeGISTstate(so->giststate); + /* drop pins on buffers -- we aren't holding any locks */ + if (BufferIsValid(so->curbuf)) + ReleaseBuffer(so->curbuf); + if (BufferIsValid(so->markbuf)) + ReleaseBuffer(so->markbuf); + MemoryContextDelete(so->tempCxt); + pfree(scan->opaque); } - gistdropscan(s); - /* XXX don't unset read lock -- two-phase locking */ + gistdropscan(scan); PG_RETURN_VOID(); } static void -gistregscan(IndexScanDesc s) +gistregscan(IndexScanDesc scan) { GISTScanList l; l = (GISTScanList) palloc(sizeof(GISTScanListData)); - l->gsl_scan = s; + l->gsl_scan = scan; l->gsl_owner = CurrentResourceOwner; l->gsl_next = GISTScans; GISTScans = l; } static void -gistdropscan(IndexScanDesc s) +gistdropscan(IndexScanDesc scan) { GISTScanList l; GISTScanList prev; prev = NULL; - for (l = GISTScans; l != NULL && l->gsl_scan != s; l = l->gsl_next) + for (l = GISTScans; l != NULL && l->gsl_scan != scan; l = l->gsl_next) prev = l; if (l == NULL) elog(ERROR, "GiST scan list corrupted -- could not find 0x%p", - (void *) s); + (void *) scan); if (prev == NULL) GISTScans = l->gsl_next; @@ -313,22 +355,22 @@ gistadjscans(Relation rel, int op, BlockNumber blkno, OffsetNumber offnum) * update. If so, we make the change here. */ static void -gistadjone(IndexScanDesc s, +gistadjone(IndexScanDesc scan, int op, BlockNumber blkno, OffsetNumber offnum) { GISTScanOpaque so; - adjustiptr(s, &(s->currentItemData), op, blkno, offnum); - adjustiptr(s, &(s->currentMarkData), op, blkno, offnum); + adjustiptr(scan, &(scan->currentItemData), op, blkno, offnum); + adjustiptr(scan, &(scan->currentMarkData), op, blkno, offnum); - so = (GISTScanOpaque) s->opaque; + so = (GISTScanOpaque) scan->opaque; if (op == GISTOP_SPLIT) { - adjuststack(so->s_stack, blkno); - adjuststack(so->s_markstk, blkno); + adjuststack(so->stack, blkno); + adjuststack(so->markstk, blkno); } } @@ -340,7 +382,7 @@ gistadjone(IndexScanDesc s, * the same page. */ static void -adjustiptr(IndexScanDesc s, +adjustiptr(IndexScanDesc scan, ItemPointer iptr, int op, BlockNumber blkno, @@ -354,7 +396,7 @@ adjustiptr(IndexScanDesc s, if (ItemPointerGetBlockNumber(iptr) == blkno) { curoff = ItemPointerGetOffsetNumber(iptr); - so = (GISTScanOpaque) s->opaque; + so = (GISTScanOpaque) scan->opaque; switch (op) { @@ -362,7 +404,6 @@ adjustiptr(IndexScanDesc s, /* back up one if we need to */ if (curoff >= offnum) { - if (curoff > FirstOffsetNumber) { /* just adjust the item pointer */ @@ -375,10 +416,10 @@ adjustiptr(IndexScanDesc s, * tuple */ ItemPointerSet(iptr, blkno, FirstOffsetNumber); - if (iptr == &(s->currentItemData)) - so->s_flags |= GS_CURBEFORE; + if (iptr == &(scan->currentItemData)) + so->flags |= GS_CURBEFORE; else - so->s_flags |= GS_MRKBEFORE; + so->flags |= GS_MRKBEFORE; } } break; @@ -386,10 +427,10 @@ adjustiptr(IndexScanDesc s, case GISTOP_SPLIT: /* back to start of page on split */ ItemPointerSet(iptr, blkno, FirstOffsetNumber); - if (iptr == &(s->currentItemData)) - so->s_flags &= ~GS_CURBEFORE; + if (iptr == &(scan->currentItemData)) + so->flags &= ~GS_CURBEFORE; else - so->s_flags &= ~GS_MRKBEFORE; + so->flags &= ~GS_MRKBEFORE; break; default: @@ -417,9 +458,20 @@ adjuststack(GISTSTACK *stk, BlockNumber blkno) { while (stk != NULL) { - if (stk->gs_blk == blkno) - stk->gs_child = FirstOffsetNumber; + if (stk->block == blkno) + stk->offset = FirstOffsetNumber; + + stk = stk->parent; + } +} - stk = stk->gs_parent; +static void +gistfreestack(GISTSTACK *s) +{ + while (s != NULL) + { + GISTSTACK *p = s->parent; + pfree(s); + s = p; } } diff --git a/src/include/access/gist.h b/src/include/access/gist.h index ee2df86b40..3cca22954c 100644 --- a/src/include/access/gist.h +++ b/src/include/access/gist.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.44 2005/03/27 23:53:04 tgl Exp $ + * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.45 2005/05/17 00:59:30 neilc Exp $ * *------------------------------------------------------------------------- */ @@ -54,13 +54,21 @@ typedef GISTPageOpaqueData *GISTPageOpaque; #define GIST_LEAF(entry) (((GISTPageOpaque) PageGetSpecialPointer((entry)->page))->flags & F_LEAF) /* - * When we descend a tree, we keep a stack of parent pointers. + * When we descend a tree, we keep a stack of parent pointers. This + * allows us to follow a chain of internal node points until we reach + * a leaf node, and then back up the stack to re-examine the internal + * nodes. + * + * 'parent' is the previous stack entry -- i.e. the node we arrived + * from. 'block' is the node's block number. 'offset' is the offset in + * the node's page that we stopped at (i.e. we followed the child + * pointer located at the specified offset). */ typedef struct GISTSTACK { - struct GISTSTACK *gs_parent; - OffsetNumber gs_child; - BlockNumber gs_blk; + struct GISTSTACK *parent; + OffsetNumber offset; + BlockNumber block; } GISTSTACK; typedef struct GISTSTATE @@ -84,10 +92,13 @@ typedef struct GISTSTATE */ typedef struct GISTScanOpaqueData { - struct GISTSTACK *s_stack; - struct GISTSTACK *s_markstk; - uint16 s_flags; - struct GISTSTATE *giststate; + GISTSTACK *stack; + GISTSTACK *markstk; + uint16 flags; + GISTSTATE *giststate; + MemoryContext tempCxt; + Buffer curbuf; + Buffer markbuf; } GISTScanOpaqueData; typedef GISTScanOpaqueData *GISTScanOpaque; @@ -101,8 +112,8 @@ typedef GISTScanOpaqueData *GISTScanOpaque; #define GS_CURBEFORE ((uint16) (1 << 0)) #define GS_MRKBEFORE ((uint16) (1 << 1)) -/* root page of a gist */ -#define GISTP_ROOT 0 +/* root page of a gist index */ +#define GIST_ROOT_BLKNO 0 /* * When we update a relation on which we're doing a scan, we need to @@ -183,7 +194,6 @@ extern Datum gistbuild(PG_FUNCTION_ARGS); extern Datum gistinsert(PG_FUNCTION_ARGS); extern Datum gistbulkdelete(PG_FUNCTION_ARGS); extern void _gistdump(Relation r); -extern void gistfreestack(GISTSTACK *s); extern void initGISTstate(GISTSTATE *giststate, Relation index); extern void freeGISTstate(GISTSTATE *giststate); extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, @@ -193,6 +203,7 @@ extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_undo(XLogRecPtr lsn, XLogRecord *record); extern void gist_desc(char *buf, uint8 xl_info, char *rec); +extern MemoryContext createTempGistContext(void); /* gistget.c */ extern Datum gistgettuple(PG_FUNCTION_ARGS);