]> granicus.if.org Git - postgresql/commitdiff
Forget to add new file :((
authorTeodor Sigaev <teodor@sigaev.ru>
Wed, 28 Jun 2006 12:08:35 +0000 (12:08 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Wed, 28 Jun 2006 12:08:35 +0000 (12:08 +0000)
src/backend/access/gist/gistsplit.c [new file with mode: 0644]

diff --git a/src/backend/access/gist/gistsplit.c b/src/backend/access/gist/gistsplit.c
new file mode 100644 (file)
index 0000000..ce5a69c
--- /dev/null
@@ -0,0 +1,538 @@
+/*-------------------------------------------------------------------------
+ *
+ * gistsplit.c
+ *       Split page algorithm
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistsplit.c,v 1.1 2006/06/28 12:08:35 teodor Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+
+typedef struct {
+    Datum   *attr;
+       int     len;
+       OffsetNumber *entries;
+       bool    *isnull;
+       bool        *equiv;
+} GistSplitUnion;
+
+
+/*
+ * Forms unions of subkeys after page split, but
+ * uses only tuples aren't in groups of equalent tuples
+ */
+static void 
+gistunionsubkeyvec(GISTSTATE *giststate,  IndexTuple *itvec, 
+                                                       GistSplitUnion *gsvp, int startkey) {
+       IndexTuple      *cleanedItVec;
+       int                     i, cleanedLen=0;
+
+       cleanedItVec = (IndexTuple*)palloc(sizeof(IndexTuple) * gsvp->len);
+
+       for(i=0;i<gsvp->len;i++) {
+               if ( gsvp->equiv && gsvp->equiv[gsvp->entries[i]])
+                       continue;
+
+               cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1];
+       }
+
+    gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey, 
+               gsvp->attr, gsvp->isnull);
+
+       pfree( cleanedItVec );
+}
+
+/*
+ * unions subkeys for after user picksplit over attno-1 column
+ */
+static void
+gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GistSplitVector *spl, int attno)
+{
+       GistSplitUnion  gsvp;
+
+       gsvp.equiv = spl->spl_equiv;
+
+       gsvp.attr = spl->spl_lattr;
+       gsvp.len = spl->splitVector.spl_nleft;
+       gsvp.entries = spl->splitVector.spl_left;
+       gsvp.isnull = spl->spl_lisnull;
+
+       gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
+
+       gsvp.attr = spl->spl_rattr;
+       gsvp.len = spl->splitVector.spl_nright;
+       gsvp.entries = spl->splitVector.spl_right;
+       gsvp.isnull = spl->spl_risnull;
+
+       gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
+}
+
+/*
+ * find group in vector with equivalent value 
+ */
+static int
+gistfindgroup(Relation r, GISTSTATE *giststate, GISTENTRY *valvec, GistSplitVector *spl, int attno)
+{
+       int                     i;
+       GISTENTRY       entry;
+       int len=0;
+
+       /*
+        * attno key is always not null (see gistSplitByKey), so we may not check for
+        * nulls
+        */
+       gistentryinit(entry, spl->splitVector.spl_rdatum, r, NULL, (OffsetNumber) 0, FALSE);
+       for (i = 0; i < spl->splitVector.spl_nleft; i++) {
+               float penalty = gistpenalty(giststate, attno, &entry, false,
+                                                                       &valvec[spl->splitVector.spl_left[i]], false);
+               if ( penalty == 0.0 ) {
+                       spl->spl_equiv[spl->splitVector.spl_left[i]] = true;
+                       len++;
+               }
+       }
+
+       gistentryinit(entry, spl->splitVector.spl_ldatum, r, NULL, (OffsetNumber) 0, FALSE);
+       for (i = 0; i < spl->splitVector.spl_nright; i++) {
+               float penalty = gistpenalty(giststate, attno, &entry, false,
+                                                                       &valvec[spl->splitVector.spl_right[i]], false);
+               if ( penalty == 0.0 ) {
+                       spl->spl_equiv[spl->splitVector.spl_right[i]] = true;
+                       len++;
+               }
+       }
+
+       return len;
+}
+
+static void
+cleanupOffsets( OffsetNumber *a, int *len, bool *equiv, int *LenEquiv ) {
+       int     curlen,i;
+       OffsetNumber    *curwpos;
+
+       curlen = *len;
+       curwpos = a;
+       for (i = 0; i < *len; i++) {
+               if ( equiv[ a[i] ] == FALSE ) {
+                       *curwpos = a[i];
+                       curwpos++;
+               } else {
+                       /* corner case: we shouldn't make void array */
+                       if ( curlen==1 ) {
+                               equiv[ a[i] ] = FALSE; /* mark item as non-equivalent */
+                               i--; /* redo the same */
+                               *LenEquiv -= 1;
+                               continue;
+                       } else
+                               curlen--;
+               }
+       }
+
+       *len = curlen;
+}
+
+static void
+placeOne( Relation r, GISTSTATE *giststate, GistSplitVector *v, IndexTuple itup, OffsetNumber off, int attno ) {
+       GISTENTRY       identry[INDEX_MAX_KEYS];
+       bool            isnull[INDEX_MAX_KEYS];
+       bool    toLeft = true;
+
+       gistDeCompressAtt(giststate, r, itup, NULL, (OffsetNumber) 0, identry, isnull);
+
+       for(;attno<giststate->tupdesc->natts;attno++) {
+               float lpenalty, rpenalty;
+               GISTENTRY       entry;
+
+               gistentryinit(entry, v->spl_lattr[attno], r, NULL, 0,  FALSE);
+               lpenalty = gistpenalty(giststate, attno, &entry, v->spl_lisnull[attno], identry+attno, isnull[ attno ]); 
+               gistentryinit(entry, v->spl_rattr[attno], r, NULL, 0,  FALSE);
+               rpenalty = gistpenalty(giststate, attno, &entry, v->spl_risnull[attno], identry+attno, isnull[ attno ]);
+
+               if ( lpenalty != rpenalty ) {
+                       if ( lpenalty > rpenalty )
+                               toLeft = false;
+                       break;
+               }
+       }
+
+       if ( toLeft ) 
+               v->splitVector.spl_left[ v->splitVector.spl_nleft++ ] = off;
+       else
+               v->splitVector.spl_right[ v->splitVector.spl_nright++ ] = off;
+}
+
+#define SWAPVAR( s, d, t ) \
+do {   \
+       (t) = (s); \
+       (s) = (d); \
+       (d) = (t); \
+} while(0)
+
+/*
+ * adjust left and right unions according to splits by previous 
+ * split by firsts columns. This function is called only in case 
+ * when pickSplit doesn't support subspplit.
+ */
+
+static void
+supportSecondarySplit( Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC *sv, Datum oldL, Datum oldR ) {
+       bool    leaveOnLeft = true, tmpBool;
+       GISTENTRY entryL, entryR, entrySL, entrySR;     
+       
+       gistentryinit(entryL, oldL, r, NULL, 0,  FALSE);
+       gistentryinit(entryR, oldR, r, NULL, 0,  FALSE);
+       gistentryinit(entrySL, sv->spl_ldatum , r, NULL, 0,  FALSE);
+       gistentryinit(entrySR, sv->spl_rdatum , r, NULL, 0,  FALSE);
+
+       if ( sv->spl_ldatum_exists && sv->spl_rdatum_exists ) {
+               float penalty1, penalty2;
+
+               penalty1 = gistpenalty(giststate, attno, &entryL, false, &entrySL, false) +
+                                  gistpenalty(giststate, attno, &entryR, false, &entrySR, false);
+               penalty2 = gistpenalty(giststate, attno, &entryL, false, &entrySR, false) +
+                                  gistpenalty(giststate, attno, &entryR, false, &entrySL, false);
+
+               if ( penalty1 > penalty2 )
+                       leaveOnLeft = false;
+
+       } else {
+               GISTENTRY       *entry1 = (sv->spl_ldatum_exists) ? &entryL : &entryR;
+               float penalty1, penalty2;
+
+               /*
+                * there is only one previously defined union,
+                * so we just choose swap or not by lowest penalty
+                */
+
+               penalty1 = gistpenalty(giststate, attno, entry1, false, &entrySL, false);
+               penalty2 = gistpenalty(giststate, attno, entry1, false, &entrySR, false);
+
+               if ( penalty1 < penalty2 ) 
+                       leaveOnLeft = ( sv->spl_ldatum_exists ) ? true : false;
+               else
+                       leaveOnLeft = ( sv->spl_rdatum_exists ) ? true : false;
+       }
+
+       if ( leaveOnLeft == false ) {
+               /*
+                * swap left and right 
+                */
+               OffsetNumber    *off, noff;
+               Datum                   datum;
+
+               SWAPVAR( sv->spl_left, sv->spl_right, off );
+               SWAPVAR( sv->spl_nleft, sv->spl_nright, noff );
+               SWAPVAR( sv->spl_ldatum, sv->spl_rdatum, datum );
+               gistentryinit(entrySL, sv->spl_ldatum , r, NULL, 0,  FALSE);
+               gistentryinit(entrySR, sv->spl_rdatum , r, NULL, 0,  FALSE);
+       }
+
+       if ( sv->spl_ldatum_exists ) 
+               gistMakeUnionKey(giststate, attno, &entryL, false, &entrySL, false,
+                                               &sv->spl_ldatum, &tmpBool);
+
+       if ( sv->spl_rdatum_exists ) 
+               gistMakeUnionKey(giststate, attno, &entryR, false, &entrySR, false,
+                                               &sv->spl_rdatum, &tmpBool);
+
+       sv->spl_ldatum_exists = sv->spl_rdatum_exists = false;
+}
+
+/*
+ * Calls user picksplit method for attno columns to split vector to
+ * two vectors. May use attno+n columns data to
+ * get better split.
+ * Returns TRUE and v->spl_equiv = NULL if left and right unions of attno columns are the same,
+ * so caller may find better split
+ * Returns TRUE and v->spl_equiv != NULL if there is tuples which may be freely moved 
+ */
+static bool
+gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVector *v,
+                                 IndexTuple *itup, int len, GISTSTATE *giststate)
+{
+       GIST_SPLITVEC *sv = &v->splitVector;
+       /*
+        * now let the user-defined picksplit function set up the split vector; in
+        * entryvec have no null value!!
+        */
+
+       sv->spl_ldatum_exists = ( v->spl_lisnull[ attno ] ) ? false : true;
+       sv->spl_rdatum_exists = ( v->spl_risnull[ attno ] ) ? false : true;
+       sv->spl_ldatum = v->spl_lattr[attno];
+       sv->spl_rdatum = v->spl_rattr[attno];
+
+       FunctionCall2(&giststate->picksplitFn[attno],
+                                 PointerGetDatum(entryvec),
+                                 PointerGetDatum(sv));
+
+       /* compatibility with old code */
+       if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber)
+               sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
+       if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber)
+               sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
+
+       if( sv->spl_ldatum_exists || sv->spl_rdatum_exists ) { 
+               elog(LOG,"PickSplit method of %d columns of index '%s' doesn't support secondary split",
+                       attno + 1, RelationGetRelationName(r) );
+
+               supportSecondarySplit( r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno] ); 
+       }
+
+       v->spl_lattr[attno] = sv->spl_ldatum;
+       v->spl_rattr[attno] = sv->spl_rdatum;
+       v->spl_lisnull[attno] = false;
+       v->spl_risnull[attno] = false;
+
+       /*
+        * if index is multikey, then we must to try get smaller bounding box for
+        * subkey(s)
+        */
+       v->spl_equiv = NULL;
+
+       if (giststate->tupdesc->natts > 1 && attno+1 != giststate->tupdesc->natts)
+       {
+               if ( gistKeyIsEQ(giststate, attno, sv->spl_ldatum, sv->spl_rdatum) ) {
+                       /*
+                        * Left and right key's unions are equial, so
+                        * we can get better split by following columns. Note,
+                        * unions for attno columns are already done.
+                        */
+
+                       return true;
+               } else {
+                       int                     LenEquiv;
+
+                       v->spl_equiv = (bool *) palloc0(sizeof(bool) * (entryvec->n+1));
+
+                       LenEquiv = gistfindgroup(r, giststate, entryvec->vector, v, attno);
+
+                       /*
+                        * if possible, we should distribute equivalent tuples 
+                        */
+                       if (LenEquiv == 0 ) { 
+                               gistunionsubkey(giststate, itup, v, attno + 1);
+                       } else {
+                               cleanupOffsets( sv->spl_left,  &sv->spl_nleft, v->spl_equiv, &LenEquiv );
+                               cleanupOffsets( sv->spl_right, &sv->spl_nright, v->spl_equiv, &LenEquiv );
+
+                               gistunionsubkey(giststate, itup, v, attno + 1);
+                               if (LenEquiv == 1 ) {
+                                       /*
+                                        * In case with one tuple we just choose left-right 
+                                        * by penalty. It's simplify user-defined pickSplit
+                                        */
+                                       OffsetNumber toMove = InvalidOffsetNumber;
+
+                                       for(toMove=FirstOffsetNumber;toMove<entryvec->n;toMove++) 
+                                               if ( v->spl_equiv[ toMove ] )
+                                                       break;
+                                       Assert( toMove < entryvec->n );
+                       
+                                       placeOne( r, giststate, v, itup[ toMove-1 ], toMove, attno+1 );
+                                       /* redo gistunionsubkey(): it will not degradate performance,
+                                        * because it's very rarely */
+                                       v->spl_equiv = NULL;
+                                       gistunionsubkey(giststate, itup, v, attno + 1);
+
+                                       return false;
+                               } else if ( LenEquiv > 1 )
+                                       return true;
+                       }
+               }
+       }
+
+       return false;
+}
+
+/*
+ * simple split page 
+ */
+static void
+gistSplitHalf(GIST_SPLITVEC *v, int len) {
+       int i;
+
+       v->spl_nright   = v->spl_nleft          = 0;
+       v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+       v->spl_right= (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+       for(i = 1; i <= len; i++)
+               if ( i<len/2 )
+                       v->spl_right[ v->spl_nright++ ] = i;
+               else
+                       v->spl_left[ v->spl_nleft++ ] = i;
+}
+
+/*
+ * if it was invalid tuple then we need special processing.
+ * We move all invalid tuples on right page. 
+ *
+ * if there is no place on left page, gistSplit will be called one more 
+ * time for left page.
+ *
+ * Normally, we never exec this code, but after crash replay it's possible
+ * to get 'invalid' tuples (probability is low enough)
+ */
+static void
+gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len) {
+       int i;
+       static OffsetNumber offInvTuples[ MaxOffsetNumber ];
+       int                      nOffInvTuples = 0;
+
+       for (i = 1; i <= len; i++)
+               if ( GistTupleIsInvalid(itup[i - 1]) )
+                       offInvTuples[ nOffInvTuples++ ] = i;
+
+       if ( nOffInvTuples == len ) {
+               /* corner case, all tuples are invalid */
+               v->spl_rightvalid= v->spl_leftvalid     = false;
+               gistSplitHalf( &v->splitVector, len );
+       } else {
+               GistSplitUnion    gsvp;
+                               
+               v->splitVector.spl_right = offInvTuples;
+               v->splitVector.spl_nright = nOffInvTuples;
+               v->spl_rightvalid = false;
+
+               v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+               v->splitVector.spl_nleft = 0;
+               for(i = 1; i <= len; i++) 
+                       if ( !GistTupleIsInvalid(itup[i - 1]) )
+                               v->splitVector.spl_left[ v->splitVector.spl_nleft++ ] = i;
+               v->spl_leftvalid = true;
+               
+               gsvp.equiv = NULL;
+               gsvp.attr = v->spl_lattr;
+               gsvp.len = v->splitVector.spl_nleft;
+               gsvp.entries = v->splitVector.spl_left;
+               gsvp.isnull = v->spl_lisnull;
+
+               gistunionsubkeyvec(giststate, itup, &gsvp, 0);
+       }
+}
+
+/*
+ * trys to split page by attno key, in a case of null
+ * values move its to separate page. 
+ */
+void
+gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate, 
+               GistSplitVector *v, GistEntryVector *entryvec, int attno) {
+       int i;
+       static OffsetNumber offNullTuples[ MaxOffsetNumber ];
+       int                      nOffNullTuples = 0;
+
+       for (i = 1; i <= len; i++) {
+               Datum       datum;
+               bool        IsNull;
+
+               if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) {
+                       gistSplitByInvalid(giststate, v, itup, len);
+                       return;
+               }
+
+               datum = index_getattr(itup[i - 1], attno+1, giststate->tupdesc, &IsNull);
+               gistdentryinit(giststate, attno, &(entryvec->vector[i]),
+                                          datum, r, page, i,
+                                          FALSE, IsNull);
+               if ( IsNull )
+                       offNullTuples[ nOffNullTuples++ ] = i;
+       }
+
+       v->spl_leftvalid = v->spl_rightvalid = true;
+
+       if ( nOffNullTuples == len ) {
+               /* 
+                * Corner case: All keys in attno column are null, we should try to
+                * split by keys in next column. It all keys in all columns
+                * are NULL just split page half by half
+                */
+               v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;
+
+               if ( attno+1 == r->rd_att->natts ) 
+                       gistSplitHalf( &v->splitVector, len );
+               else 
+                       gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1);
+       } else if ( nOffNullTuples > 0 ) {
+               int j=0;
+               
+               /* 
+                * We don't want to mix NULLs and not-NULLs keys
+                * on one page, so move nulls to right page
+                */
+               v->splitVector.spl_right = offNullTuples;
+               v->splitVector.spl_nright = nOffNullTuples;
+               v->spl_risnull[attno] = TRUE;
+
+               v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+               v->splitVector.spl_nleft = 0;
+               for(i = 1; i <= len; i++) 
+                       if ( j<v->splitVector.spl_nright && offNullTuples[j] == i ) 
+                               j++;
+                       else
+                               v->splitVector.spl_left[ v->splitVector.spl_nleft++ ] = i;
+
+               v->spl_equiv = NULL;
+               gistunionsubkey(giststate, itup, v, attno);
+       } else {
+               /*
+                * all keys are not-null
+                */
+               entryvec->n = len+1;
+
+               if ( gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate) && attno+1 != r->rd_att->natts ) {
+                       /*
+                        * Splitting on attno column is not optimized: there is a tuples which can be freely
+                        * left or right page, we will try to split page by 
+                        * following columns
+                        */
+                       if ( v->spl_equiv == NULL ) {
+                               /* simple case: left and right keys for attno column are equial */
+                               gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1);
+                       } else {
+                               /* we should clean up vector from already distributed tuples */
+                               IndexTuple      *newitup =  (IndexTuple*)palloc((len + 1) * sizeof(IndexTuple));
+                               OffsetNumber    *map = (OffsetNumber*)palloc((len + 1) * sizeof(IndexTuple));
+                               int newlen = 0;
+                               GIST_SPLITVEC backupSplit = v->splitVector;
+
+                               for(i=0; i<len; i++) 
+                                       if ( v->spl_equiv[i+1] ) {
+                                               map[ newlen ] = i+1;
+                                               newitup[ newlen++ ] = itup[i];
+                                       }
+
+                               Assert( newlen>0 );
+
+                               backupSplit.spl_left = (OffsetNumber*)palloc(sizeof(OffsetNumber)*len);
+                               memcpy( backupSplit.spl_left, v->splitVector.spl_left, sizeof(OffsetNumber)*v->splitVector.spl_nleft);
+                               backupSplit.spl_right = (OffsetNumber*)palloc(sizeof(OffsetNumber)*len);
+                               memcpy( backupSplit.spl_right, v->splitVector.spl_right, sizeof(OffsetNumber)*v->splitVector.spl_nright);
+
+                               gistSplitByKey(r, page, newitup, newlen, giststate, v, entryvec, attno+1);
+
+                               /* merge result of subsplit */
+                               for(i=0;i<v->splitVector.spl_nleft;i++)
+                                       backupSplit.spl_left[ backupSplit.spl_nleft++ ] = map[ v->splitVector.spl_left[i]-1 ];
+                               for(i=0;i<v->splitVector.spl_nright;i++)
+                                       backupSplit.spl_right[ backupSplit.spl_nright++ ] = map[ v->splitVector.spl_right[i]-1 ];
+
+                               v->splitVector = backupSplit;
+                               /* reunion left and right datums */
+                               gistunionsubkey(giststate, itup, v, attno);
+                       }
+               } 
+       }
+}