* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.135 2006/05/17 16:34:59 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.136 2006/05/19 16:15:17 teodor Exp $
*
*-------------------------------------------------------------------------
*/
gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
}
-static void
-gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset)
-{
- int i;
-
- for (i = 0; i < len; i++)
- arr[i] = reasloffset[arr[i]];
-}
-
-static IndexTupleData *
-gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
- char *ptr, *ret = palloc(BLCKSZ);
- int i;
-
- ptr = ret;
- for (i = 0; i < veclen; i++) {
- memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
- ptr += IndexTupleSize(vec[i]);
- }
-
- *memlen = ptr - ret;
- Assert( *memlen < BLCKSZ );
- return (IndexTupleData*)ret;
-}
-
/*
* gistSplit -- split a page in the tree.
*/
*rvectup;
GIST_SPLITVEC v;
GistEntryVector *entryvec;
- int i,
- fakeoffset;
- OffsetNumber *realoffset;
- IndexTuple *cleaneditup = itup;
- int lencleaneditup = len;
+ int i;
+ OffsetNumber offInvTuples[ MaxOffsetNumber ];
+ int nOffInvTuples = 0;
SplitedPageLayout *res = NULL;
/* generate the item array */
- realoffset = palloc((len + 1) * sizeof(OffsetNumber));
entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
entryvec->n = len + 1;
- fakeoffset = FirstOffsetNumber;
for (i = 1; i <= len; i++)
{
Datum datum;
bool IsNull;
if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
- {
- entryvec->n--;
/* remember position of invalid tuple */
- realoffset[entryvec->n] = i;
- continue;
- }
+ offInvTuples[ nOffInvTuples++ ] = i;
+
+ if ( nOffInvTuples > 0 )
+ /* we can safely do not decompress other keys, because
+ we will do splecial processing, but
+ it's needed to find another invalid tuples */
+ continue;
datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
- gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
+ gistdentryinit(giststate, 0, &(entryvec->vector[i]),
datum, r, page, i,
ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
FALSE, IsNull);
- realoffset[fakeoffset] = i;
- fakeoffset++;
}
/*
- * if it was invalid tuple then we need special processing. If it's
- * possible, we move all invalid tuples on right page. We should remember,
- * that union with invalid tuples is a invalid tuple.
+ * if it was invalid tuple then we need special processing.
+ * We move all invalid tuples on right page.
+ *
+ * if there is no place on left page, gistSplit will be called one more
+ * time for left page.
+ *
+ * Normally, we never exec this code, but after crash replay it's possible
+ * to get 'invalid' tuples (probability is low enough)
*/
- if (entryvec->n != len + 1)
+ if (nOffInvTuples > 0)
{
- lencleaneditup = entryvec->n - 1;
- cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple));
- for (i = 1; i < entryvec->n; i++)
- cleaneditup[i - 1] = itup[realoffset[i] - 1];
-
- if (!gistfitpage(cleaneditup, lencleaneditup))
- {
- /* no space on left to put all good tuples, so picksplit */
- gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
- v.spl_leftvalid = true;
- v.spl_rightvalid = false;
- gistToRealOffset(v.spl_left, v.spl_nleft, realoffset);
- gistToRealOffset(v.spl_right, v.spl_nright, realoffset);
- }
- else
- {
- /* we can try to store all valid tuples on one page */
- v.spl_right = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
- v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
-
- if (lencleaneditup == 0)
- {
- /* all tuples are invalid, so moves half of its to right */
- v.spl_leftvalid = v.spl_rightvalid = false;
- v.spl_nright = 0;
- v.spl_nleft = 0;
- for (i = 1; i <= len; i++)
- if (i - 1 < len / 2)
- v.spl_left[v.spl_nleft++] = i;
- else
- v.spl_right[v.spl_nright++] = i;
- }
- else
- {
- /*
- * we will not call gistUserPicksplit, just put good tuples on
- * left and invalid on right
- */
- v.spl_nleft = lencleaneditup;
- v.spl_nright = 0;
- for (i = 1; i < entryvec->n; i++)
- v.spl_left[i - 1] = i;
- gistToRealOffset(v.spl_left, v.spl_nleft, realoffset);
- v.spl_lattr[0] = v.spl_ldatum = (Datum) 0;
- v.spl_rattr[0] = v.spl_rdatum = (Datum) 0;
- v.spl_lisnull[0] = true;
- v.spl_risnull[0] = true;
- gistunionsubkey(r, giststate, itup, &v, true);
- v.spl_leftvalid = true;
- v.spl_rightvalid = false;
- }
- }
+ GistSplitVec gsvp;
+
+ v.spl_right = offInvTuples;
+ v.spl_nright = nOffInvTuples;
+ v.spl_rightvalid = false;
+
+ v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
+ v.spl_nleft = 0;
+ for(i = 1; i <= len; i++)
+ if ( !GistTupleIsInvalid(itup[i - 1]) )
+ v.spl_left[ v.spl_nleft++ ] = i;
+ v.spl_leftvalid = true;
+
+ gsvp.idgrp = NULL;
+ gsvp.attrsize = v.spl_lattrsize;
+ gsvp.attr = v.spl_lattr;
+ gsvp.len = v.spl_nleft;
+ gsvp.entries = v.spl_left;
+ gsvp.isnull = v.spl_lisnull;
+
+ gistunionsubkeyvec(giststate, itup, &gsvp, true);
}
else
{
for (i = 0; i < v.spl_nright; i++)
rvectup[i] = itup[v.spl_right[i] - 1];
- /* place invalid tuples on right page if itsn't done yet */
- for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++)
- {
- rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
- }
-
/* finalyze splitting (may need another split) */
if (!gistfitpage(rvectup, v.spl_nright))
{
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.12 2006/05/17 16:34:59 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.13 2006/05/19 16:15:17 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
return itvec;
}
+/*
+ * make plain IndexTupleVector
+ */
+
+IndexTupleData *
+gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
+ char *ptr, *ret;
+ int i;
+
+ *memlen=0;
+
+ for (i = 0; i < veclen; i++)
+ *memlen += IndexTupleSize(vec[i]);
+
+ ptr = ret = palloc(*memlen);
+
+ for (i = 0; i < veclen; i++) {
+ memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
+ ptr += IndexTupleSize(vec[i]);
+ }
+
+ return (IndexTupleData*)ret;
+}
+
/*
* Return an IndexTuple containing the result of applying the "union"
* method to the specified IndexTuple vector.
return newtup;
}
-void
-gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall)
-{
- int lr;
+void
+gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec,
+ GistSplitVec *gsvp, bool isall) {
+ int i;
+ GistEntryVector *evec;
- for (lr = 0; lr < 2; lr++)
- {
- OffsetNumber *entries;
- int i;
- Datum *attr;
- int len,
- *attrsize;
- bool *isnull;
- GistEntryVector *evec;
-
- if (lr)
- {
- attrsize = spl->spl_lattrsize;
- attr = spl->spl_lattr;
- len = spl->spl_nleft;
- entries = spl->spl_left;
- isnull = spl->spl_lisnull;
- }
- else
- {
- attrsize = spl->spl_rattrsize;
- attr = spl->spl_rattr;
- len = spl->spl_nright;
- entries = spl->spl_right;
- isnull = spl->spl_risnull;
- }
+ evec = palloc(((gsvp->len < 2) ? 2 : gsvp->len) * sizeof(GISTENTRY) + GEVHDRSZ);
- evec = palloc(((len < 2) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+ for (i = (isall) ? 0 : 1; i < giststate->tupdesc->natts; i++)
+ {
+ int j;
+ Datum datum;
+ int datumsize;
+ int real_len;
- for (i = (isall) ? 0 : 1; i < r->rd_att->natts; i++)
+ real_len = 0;
+ for (j = 0; j < gsvp->len; j++)
{
- int j;
- Datum datum;
- int datumsize;
- int real_len;
+ bool IsNull;
- real_len = 0;
- for (j = 0; j < len; j++)
- {
- bool IsNull;
+ if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[j]])
+ continue;
- if (spl->spl_idgrp[entries[j]])
- continue;
- datum = index_getattr(itvec[entries[j] - 1], i + 1,
+ datum = index_getattr(itvec[gsvp->entries[j] - 1], i + 1,
giststate->tupdesc, &IsNull);
- if (IsNull)
- continue;
- gistdentryinit(giststate, i,
- &(evec->vector[real_len]),
- datum,
- NULL, NULL, (OffsetNumber) 0,
+ if (IsNull)
+ continue;
+ gistdentryinit(giststate, i,
+ &(evec->vector[real_len]),
+ datum,
+ NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
- FALSE, IsNull);
- real_len++;
+ FALSE, IsNull);
+ real_len++;
- }
+ }
- if (real_len == 0)
+ if (real_len == 0)
+ {
+ datum = (Datum) 0;
+ datumsize = 0;
+ gsvp->isnull[i] = true;
+ }
+ else
+ {
+ /*
+ * evec->vector[0].bytes may be not defined, so form union
+ * with itself
+ */
+ if (real_len == 1)
{
- datum = (Datum) 0;
- datumsize = 0;
- isnull[i] = true;
+ evec->n = 2;
+ memcpy(&(evec->vector[1]), &(evec->vector[0]),
+ sizeof(GISTENTRY));
}
else
- {
- /*
- * evec->vector[0].bytes may be not defined, so form union
- * with itself
- */
- if (real_len == 1)
- {
- evec->n = 2;
- memcpy(&(evec->vector[1]), &(evec->vector[0]),
- sizeof(GISTENTRY));
- }
- else
- evec->n = real_len;
- datum = FunctionCall2(&giststate->unionFn[i],
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
- isnull[i] = false;
- }
-
- attr[i] = datum;
- attrsize[i] = datumsize;
+ evec->n = real_len;
+ datum = FunctionCall2(&giststate->unionFn[i],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+ gsvp->isnull[i] = false;
}
+
+ gsvp->attr[i] = datum;
+ gsvp->attrsize[i] = datumsize;
}
}
+/*
+ * unions subkey for after user picksplit over first column
+ */
+static void
+gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
+{
+ GistSplitVec gsvp;
+
+ gsvp.idgrp = spl->spl_idgrp;
+
+ gsvp.attrsize = spl->spl_lattrsize;
+ gsvp.attr = spl->spl_lattr;
+ gsvp.len = spl->spl_nleft;
+ gsvp.entries = spl->spl_left;
+ gsvp.isnull = spl->spl_lisnull;
+
+ gistunionsubkeyvec(giststate, itvec, &gsvp, false);
+
+ gsvp.attrsize = spl->spl_rattrsize;
+ gsvp.attr = spl->spl_rattr;
+ gsvp.len = spl->spl_nright;
+ gsvp.entries = spl->spl_right;
+ gsvp.isnull = spl->spl_risnull;
+
+ gistunionsubkeyvec(giststate, itvec, &gsvp, false);
+}
+
/*
* find group in vector with equal value
*/
* if index is multikey, then we must to try get smaller bounding box for
* subkey(s)
*/
- if (r->rd_att->natts > 1)
+ if (giststate->tupdesc->natts > 1)
{
int MaxGrpId;
MaxGrpId = gistfindgroup(giststate, entryvec->vector, v);
/* form union of sub keys for each page (l,p) */
- gistunionsubkey(r, giststate, itup, v, false);
+ gistunionsubkey(giststate, itup, v);
/*
* if possible, we insert equivalent tuples with control by penalty
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.18 2006/05/19 11:10:25 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.19 2006/05/19 16:15:17 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
while( nbuffers-- > 0 ) {
Page page = BufferGetPage( buffers[ nbuffers ] );
- IndexTuple idxtup;
- OffsetNumber i;
- char *ptr;
+ IndexTuple* vec;
+ int veclen;
resptr = (SplitedPageLayout*)palloc0( sizeof(SplitedPageLayout) );
resptr->block.blkno = BufferGetBlockNumber( buffers[ nbuffers ] );
resptr->block.num = PageGetMaxOffsetNumber( page );
- for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
- idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
- resptr->lenlist += IndexTupleSize(idxtup);
- }
-
- resptr->list = (IndexTupleData*)palloc( resptr->lenlist );
- ptr = (char*)(resptr->list);
-
- for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
- idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
- memcpy( ptr, idxtup, IndexTupleSize(idxtup) );
- ptr += IndexTupleSize(idxtup);
- }
+ vec = gistextractpage( page, &veclen );
+ resptr->list = gistfillitupvec( vec, veclen, &(resptr->lenlist) );
resptr->next = res;
res = resptr;
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.14 2006/05/17 16:34:59 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.15 2006/05/19 16:15:17 teodor Exp $
*
*-------------------------------------------------------------------------
*/
extern IndexTuple *gistjoinvector(
IndexTuple *itvec, int *len,
IndexTuple *additvec, int addlen);
+extern IndexTupleData* gistfillitupvec(IndexTuple *vec, int veclen, int *memlen);
+
extern IndexTuple gistunion(Relation r, IndexTuple *itvec,
int len, GISTSTATE *giststate);
extern IndexTuple gistgetadjusted(Relation r,
extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
IndexTuple tuple, Page p, OffsetNumber o,
GISTENTRY *attdata, bool *isnull);
-extern void gistunionsubkey(Relation r, GISTSTATE *giststate,
- IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall);
+
+typedef struct {
+ int *attrsize;
+ Datum *attr;
+ int len;
+ OffsetNumber *entries;
+ bool *isnull;
+ int *idgrp;
+} GistSplitVec;
+
+extern void gistunionsubkeyvec(GISTSTATE *giststate,
+ IndexTuple *itvec, GistSplitVec *gsvp, bool isall);
+
extern void GISTInitBuffer(Buffer b, uint32 f);
extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
Datum k, Relation r, Page pg, OffsetNumber o,