]> granicus.if.org Git - postgresql/blob - src/backend/access/gist/gistsplit.c
a96b881cf1ddbcac273da23bb0cb7e0ffb90b7b1
[postgresql] / src / backend / access / gist / gistsplit.c
1 /*-------------------------------------------------------------------------
2  *
3  * gistsplit.c
4  *        Split page algorithm
5  *
6  *
7  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        src/backend/access/gist/gistsplit.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/gist_private.h"
18 #include "utils/rel.h"
19
20 typedef struct
21 {
22         Datum      *attr;
23         int                     len;
24         OffsetNumber *entries;
25         bool       *isnull;
26         bool       *equiv;
27 } GistSplitUnion;
28
29
30 /*
31  * Forms unions of subkeys after page split, but
32  * uses only tuples that aren't in groups of equivalent tuples
33  */
34 static void
35 gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec,
36                                    GistSplitUnion *gsvp, int startkey)
37 {
38         IndexTuple *cleanedItVec;
39         int                     i,
40                                 cleanedLen = 0;
41
42         cleanedItVec = (IndexTuple *) palloc(sizeof(IndexTuple) * gsvp->len);
43
44         for (i = 0; i < gsvp->len; i++)
45         {
46                 if (gsvp->equiv && gsvp->equiv[gsvp->entries[i]])
47                         continue;
48
49                 cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1];
50         }
51
52         gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey,
53                                            gsvp->attr, gsvp->isnull);
54
55         pfree(cleanedItVec);
56 }
57
58 /*
59  * unions subkeys for after user picksplit over attno-1 column
60  */
61 static void
62 gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GistSplitVector *spl, int attno)
63 {
64         GistSplitUnion gsvp;
65
66         gsvp.equiv = spl->spl_equiv;
67
68         gsvp.attr = spl->spl_lattr;
69         gsvp.len = spl->splitVector.spl_nleft;
70         gsvp.entries = spl->splitVector.spl_left;
71         gsvp.isnull = spl->spl_lisnull;
72
73         gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
74
75         gsvp.attr = spl->spl_rattr;
76         gsvp.len = spl->splitVector.spl_nright;
77         gsvp.entries = spl->splitVector.spl_right;
78         gsvp.isnull = spl->spl_risnull;
79
80         gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
81 }
82
83 /*
84  * find group in vector with equivalent value
85  */
86 static int
87 gistfindgroup(Relation r, GISTSTATE *giststate, GISTENTRY *valvec, GistSplitVector *spl, int attno)
88 {
89         int                     i;
90         GISTENTRY       entry;
91         int                     len = 0;
92
93         /*
94          * attno key is always not null (see gistSplitByKey), so we may not check
95          * for nulls
96          */
97         gistentryinit(entry, spl->splitVector.spl_rdatum, r, NULL, (OffsetNumber) 0, FALSE);
98         for (i = 0; i < spl->splitVector.spl_nleft; i++)
99         {
100                 float           penalty = gistpenalty(giststate, attno, &entry, false,
101                                                            &valvec[spl->splitVector.spl_left[i]], false);
102
103                 if (penalty == 0.0)
104                 {
105                         spl->spl_equiv[spl->splitVector.spl_left[i]] = true;
106                         len++;
107                 }
108         }
109
110         gistentryinit(entry, spl->splitVector.spl_ldatum, r, NULL, (OffsetNumber) 0, FALSE);
111         for (i = 0; i < spl->splitVector.spl_nright; i++)
112         {
113                 float           penalty = gistpenalty(giststate, attno, &entry, false,
114                                                           &valvec[spl->splitVector.spl_right[i]], false);
115
116                 if (penalty == 0.0)
117                 {
118                         spl->spl_equiv[spl->splitVector.spl_right[i]] = true;
119                         len++;
120                 }
121         }
122
123         return len;
124 }
125
126 static void
127 cleanupOffsets(OffsetNumber *a, int *len, bool *equiv, int *LenEquiv)
128 {
129         int                     curlen,
130                                 i;
131         OffsetNumber *curwpos;
132
133         curlen = *len;
134         curwpos = a;
135         for (i = 0; i < *len; i++)
136         {
137                 if (equiv[a[i]] == FALSE)
138                 {
139                         *curwpos = a[i];
140                         curwpos++;
141                 }
142                 else
143                 {
144                         /* corner case: we shouldn't make void array */
145                         if (curlen == 1)
146                         {
147                                 equiv[a[i]] = FALSE;    /* mark item as non-equivalent */
148                                 i--;                    /* redo the same */
149                                 *LenEquiv -= 1;
150                                 continue;
151                         }
152                         else
153                                 curlen--;
154                 }
155         }
156
157         *len = curlen;
158 }
159
160 static void
161 placeOne(Relation r, GISTSTATE *giststate, GistSplitVector *v, IndexTuple itup, OffsetNumber off, int attno)
162 {
163         GISTENTRY       identry[INDEX_MAX_KEYS];
164         bool            isnull[INDEX_MAX_KEYS];
165         bool            toLeft = true;
166
167         gistDeCompressAtt(giststate, r, itup, NULL, (OffsetNumber) 0, identry, isnull);
168
169         for (; attno < giststate->tupdesc->natts; attno++)
170         {
171                 float           lpenalty,
172                                         rpenalty;
173                 GISTENTRY       entry;
174
175                 gistentryinit(entry, v->spl_lattr[attno], r, NULL, 0, FALSE);
176                 lpenalty = gistpenalty(giststate, attno, &entry, v->spl_lisnull[attno], identry + attno, isnull[attno]);
177                 gistentryinit(entry, v->spl_rattr[attno], r, NULL, 0, FALSE);
178                 rpenalty = gistpenalty(giststate, attno, &entry, v->spl_risnull[attno], identry + attno, isnull[attno]);
179
180                 if (lpenalty != rpenalty)
181                 {
182                         if (lpenalty > rpenalty)
183                                 toLeft = false;
184                         break;
185                 }
186         }
187
188         if (toLeft)
189                 v->splitVector.spl_left[v->splitVector.spl_nleft++] = off;
190         else
191                 v->splitVector.spl_right[v->splitVector.spl_nright++] = off;
192 }
193
194 #define SWAPVAR( s, d, t ) \
195 do {    \
196         (t) = (s); \
197         (s) = (d); \
198         (d) = (t); \
199 } while(0)
200
201 /*
202  * adjust left and right unions according to splits by previous
203  * split by first columns. This function is called only in case
204  * when pickSplit doesn't support subsplit.
205  */
206
207 static void
208 supportSecondarySplit(Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC *sv, Datum oldL, Datum oldR)
209 {
210         bool            leaveOnLeft = true,
211                                 tmpBool;
212         GISTENTRY       entryL,
213                                 entryR,
214                                 entrySL,
215                                 entrySR;
216
217         gistentryinit(entryL, oldL, r, NULL, 0, FALSE);
218         gistentryinit(entryR, oldR, r, NULL, 0, FALSE);
219         gistentryinit(entrySL, sv->spl_ldatum, r, NULL, 0, FALSE);
220         gistentryinit(entrySR, sv->spl_rdatum, r, NULL, 0, FALSE);
221
222         if (sv->spl_ldatum_exists && sv->spl_rdatum_exists)
223         {
224                 float           penalty1,
225                                         penalty2;
226
227                 penalty1 = gistpenalty(giststate, attno, &entryL, false, &entrySL, false) +
228                         gistpenalty(giststate, attno, &entryR, false, &entrySR, false);
229                 penalty2 = gistpenalty(giststate, attno, &entryL, false, &entrySR, false) +
230                         gistpenalty(giststate, attno, &entryR, false, &entrySL, false);
231
232                 if (penalty1 > penalty2)
233                         leaveOnLeft = false;
234
235         }
236         else
237         {
238                 GISTENTRY  *entry1 = (sv->spl_ldatum_exists) ? &entryL : &entryR;
239                 float           penalty1,
240                                         penalty2;
241
242                 /*
243                  * there is only one previously defined union, so we just choose swap
244                  * or not by lowest penalty
245                  */
246
247                 penalty1 = gistpenalty(giststate, attno, entry1, false, &entrySL, false);
248                 penalty2 = gistpenalty(giststate, attno, entry1, false, &entrySR, false);
249
250                 if (penalty1 < penalty2)
251                         leaveOnLeft = (sv->spl_ldatum_exists) ? true : false;
252                 else
253                         leaveOnLeft = (sv->spl_rdatum_exists) ? true : false;
254         }
255
256         if (leaveOnLeft == false)
257         {
258                 /*
259                  * swap left and right
260                  */
261                 OffsetNumber *off,
262                                         noff;
263                 Datum           datum;
264
265                 SWAPVAR(sv->spl_left, sv->spl_right, off);
266                 SWAPVAR(sv->spl_nleft, sv->spl_nright, noff);
267                 SWAPVAR(sv->spl_ldatum, sv->spl_rdatum, datum);
268                 gistentryinit(entrySL, sv->spl_ldatum, r, NULL, 0, FALSE);
269                 gistentryinit(entrySR, sv->spl_rdatum, r, NULL, 0, FALSE);
270         }
271
272         if (sv->spl_ldatum_exists)
273                 gistMakeUnionKey(giststate, attno, &entryL, false, &entrySL, false,
274                                                  &sv->spl_ldatum, &tmpBool);
275
276         if (sv->spl_rdatum_exists)
277                 gistMakeUnionKey(giststate, attno, &entryR, false, &entrySR, false,
278                                                  &sv->spl_rdatum, &tmpBool);
279
280         sv->spl_ldatum_exists = sv->spl_rdatum_exists = false;
281 }
282
283 /*
284  * Trivial picksplit implementaion. Function called only
285  * if user-defined picksplit puts all keys to the one page.
286  * That is a bug of user-defined picksplit but we'd like
287  * to "fix" that.
288  */
289 static void
290 genericPickSplit(GISTSTATE *giststate, GistEntryVector *entryvec, GIST_SPLITVEC *v, int attno)
291 {
292         OffsetNumber i,
293                                 maxoff;
294         int                     nbytes;
295         GistEntryVector *evec;
296
297         maxoff = entryvec->n - 1;
298
299         nbytes = (maxoff + 2) * sizeof(OffsetNumber);
300
301         v->spl_left = (OffsetNumber *) palloc(nbytes);
302         v->spl_right = (OffsetNumber *) palloc(nbytes);
303         v->spl_nleft = v->spl_nright = 0;
304
305         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
306         {
307                 if (i <= (maxoff - FirstOffsetNumber + 1) / 2)
308                 {
309                         v->spl_left[v->spl_nleft] = i;
310                         v->spl_nleft++;
311                 }
312                 else
313                 {
314                         v->spl_right[v->spl_nright] = i;
315                         v->spl_nright++;
316                 }
317         }
318
319         /*
320          * Form unions of each page
321          */
322
323         evec = palloc(sizeof(GISTENTRY) * entryvec->n + GEVHDRSZ);
324
325         evec->n = v->spl_nleft;
326         memcpy(evec->vector, entryvec->vector + FirstOffsetNumber,
327                    sizeof(GISTENTRY) * evec->n);
328         v->spl_ldatum = FunctionCall2Coll(&giststate->unionFn[attno],
329                                                                           giststate->supportCollation[attno],
330                                                                           PointerGetDatum(evec),
331                                                                           PointerGetDatum(&nbytes));
332
333         evec->n = v->spl_nright;
334         memcpy(evec->vector, entryvec->vector + FirstOffsetNumber + v->spl_nleft,
335                    sizeof(GISTENTRY) * evec->n);
336         v->spl_rdatum = FunctionCall2Coll(&giststate->unionFn[attno],
337                                                                           giststate->supportCollation[attno],
338                                                                           PointerGetDatum(evec),
339                                                                           PointerGetDatum(&nbytes));
340 }
341
342 /*
343  * Calls user picksplit method for attno columns to split vector to
344  * two vectors. May use attno+n columns data to
345  * get better split.
346  * Returns TRUE and v->spl_equiv = NULL if left and right unions of attno columns are the same,
347  * so caller may find better split
348  * Returns TRUE and v->spl_equiv != NULL if there is tuples which may be freely moved
349  */
350 static bool
351 gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVector *v,
352                                   IndexTuple *itup, int len, GISTSTATE *giststate)
353 {
354         GIST_SPLITVEC *sv = &v->splitVector;
355
356         /*
357          * now let the user-defined picksplit function set up the split vector; in
358          * entryvec there is no null value!!
359          */
360
361         sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
362         sv->spl_rdatum_exists = (v->spl_risnull[attno]) ? false : true;
363         sv->spl_ldatum = v->spl_lattr[attno];
364         sv->spl_rdatum = v->spl_rattr[attno];
365
366         FunctionCall2Coll(&giststate->picksplitFn[attno],
367                                           giststate->supportCollation[attno],
368                                           PointerGetDatum(entryvec),
369                                           PointerGetDatum(sv));
370
371         if (sv->spl_nleft == 0 || sv->spl_nright == 0)
372         {
373                 ereport(DEBUG1,
374                                 (errcode(ERRCODE_INTERNAL_ERROR),
375                           errmsg("picksplit method for column %d of index \"%s\" failed",
376                                          attno + 1, RelationGetRelationName(r)),
377                                  errhint("The index is not optimal. To optimize it, contact a developer, or try to use the column as the second one in the CREATE INDEX command.")));
378
379                 /*
380                  * Reinit GIST_SPLITVEC. Although that fields are not used by
381                  * genericPickSplit(), let us set up it for further processing
382                  */
383                 sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
384                 sv->spl_rdatum_exists = (v->spl_risnull[attno]) ? false : true;
385                 sv->spl_ldatum = v->spl_lattr[attno];
386                 sv->spl_rdatum = v->spl_rattr[attno];
387
388                 genericPickSplit(giststate, entryvec, sv, attno);
389
390                 if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
391                         supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
392         }
393         else
394         {
395                 /* compatibility with old code */
396                 if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber)
397                         sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
398                 if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber)
399                         sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
400
401                 if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
402                 {
403                         elog(LOG, "picksplit method for column %d of index \"%s\" doesn't support secondary split",
404                                  attno + 1, RelationGetRelationName(r));
405
406                         supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
407                 }
408         }
409
410         v->spl_lattr[attno] = sv->spl_ldatum;
411         v->spl_rattr[attno] = sv->spl_rdatum;
412         v->spl_lisnull[attno] = false;
413         v->spl_risnull[attno] = false;
414
415         /*
416          * if index is multikey, then we must to try get smaller bounding box for
417          * subkey(s)
418          */
419         v->spl_equiv = NULL;
420
421         if (giststate->tupdesc->natts > 1 && attno + 1 != giststate->tupdesc->natts)
422         {
423                 if (gistKeyIsEQ(giststate, attno, sv->spl_ldatum, sv->spl_rdatum))
424                 {
425                         /*
426                          * Left and right key's unions are equial, so we can get better
427                          * split by following columns. Note, unions for attno columns are
428                          * already done.
429                          */
430
431                         return true;
432                 }
433                 else
434                 {
435                         int                     LenEquiv;
436
437                         v->spl_equiv = (bool *) palloc0(sizeof(bool) * (entryvec->n + 1));
438
439                         LenEquiv = gistfindgroup(r, giststate, entryvec->vector, v, attno);
440
441                         /*
442                          * if possible, we should distribute equivalent tuples
443                          */
444                         if (LenEquiv == 0)
445                         {
446                                 gistunionsubkey(giststate, itup, v, attno + 1);
447                         }
448                         else
449                         {
450                                 cleanupOffsets(sv->spl_left, &sv->spl_nleft, v->spl_equiv, &LenEquiv);
451                                 cleanupOffsets(sv->spl_right, &sv->spl_nright, v->spl_equiv, &LenEquiv);
452
453                                 gistunionsubkey(giststate, itup, v, attno + 1);
454                                 if (LenEquiv == 1)
455                                 {
456                                         /*
457                                          * In case with one tuple we just choose left-right by
458                                          * penalty. It's simplify user-defined pickSplit
459                                          */
460                                         OffsetNumber toMove = InvalidOffsetNumber;
461
462                                         for (toMove = FirstOffsetNumber; toMove < entryvec->n; toMove++)
463                                                 if (v->spl_equiv[toMove])
464                                                         break;
465                                         Assert(toMove < entryvec->n);
466
467                                         placeOne(r, giststate, v, itup[toMove - 1], toMove, attno + 1);
468
469                                         /*
470                                          * redo gistunionsubkey(): it will not degradate
471                                          * performance, because it's very rarely
472                                          */
473                                         v->spl_equiv = NULL;
474                                         gistunionsubkey(giststate, itup, v, attno + 1);
475
476                                         return false;
477                                 }
478                                 else if (LenEquiv > 1)
479                                         return true;
480                         }
481                 }
482         }
483
484         return false;
485 }
486
487 /*
488  * simple split page
489  */
490 static void
491 gistSplitHalf(GIST_SPLITVEC *v, int len)
492 {
493         int                     i;
494
495         v->spl_nright = v->spl_nleft = 0;
496         v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
497         v->spl_right = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
498         for (i = 1; i <= len; i++)
499                 if (i < len / 2)
500                         v->spl_right[v->spl_nright++] = i;
501                 else
502                         v->spl_left[v->spl_nleft++] = i;
503 }
504
505 /*
506  * tries to split page by attno key, in case of null
507  * values move those to separate page.
508  */
509 void
510 gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate,
511                            GistSplitVector *v, GistEntryVector *entryvec, int attno)
512 {
513         int                     i;
514         static OffsetNumber offNullTuples[MaxOffsetNumber];
515         int                     nOffNullTuples = 0;
516
517         for (i = 1; i <= len; i++)
518         {
519                 Datum           datum;
520                 bool            IsNull;
521
522                 datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull);
523                 gistdentryinit(giststate, attno, &(entryvec->vector[i]),
524                                            datum, r, page, i,
525                                            FALSE, IsNull);
526                 if (IsNull)
527                         offNullTuples[nOffNullTuples++] = i;
528         }
529
530         if (nOffNullTuples == len)
531         {
532                 /*
533                  * Corner case: All keys in attno column are null, we should try to
534                  * split by keys in next column. If all keys in all columns are NULL
535                  * just split page half by half
536                  */
537                 v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;
538
539                 if (attno + 1 == r->rd_att->natts)
540                         gistSplitHalf(&v->splitVector, len);
541                 else
542                         gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno + 1);
543         }
544         else if (nOffNullTuples > 0)
545         {
546                 int                     j = 0;
547
548                 /*
549                  * We don't want to mix NULLs and not-NULLs keys on one page, so move
550                  * nulls to right page
551                  */
552                 v->splitVector.spl_right = offNullTuples;
553                 v->splitVector.spl_nright = nOffNullTuples;
554                 v->spl_risnull[attno] = TRUE;
555
556                 v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
557                 v->splitVector.spl_nleft = 0;
558                 for (i = 1; i <= len; i++)
559                         if (j < v->splitVector.spl_nright && offNullTuples[j] == i)
560                                 j++;
561                         else
562                                 v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
563
564                 v->spl_equiv = NULL;
565                 gistunionsubkey(giststate, itup, v, attno);
566         }
567         else
568         {
569                 /*
570                  * all keys are not-null
571                  */
572                 entryvec->n = len + 1;
573
574                 if (gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate) && attno + 1 != r->rd_att->natts)
575                 {
576                         /*
577                          * Splitting on attno column is not optimized: there is a tuples
578                          * which can be freely left or right page, we will try to split
579                          * page by following columns
580                          */
581                         if (v->spl_equiv == NULL)
582                         {
583                                 /*
584                                  * simple case: left and right keys for attno column are equal
585                                  */
586                                 gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno + 1);
587                         }
588                         else
589                         {
590                                 /* we should clean up vector from already distributed tuples */
591                                 IndexTuple *newitup = (IndexTuple *) palloc((len + 1) * sizeof(IndexTuple));
592                                 OffsetNumber *map = (OffsetNumber *) palloc((len + 1) * sizeof(IndexTuple));
593                                 int                     newlen = 0;
594                                 GIST_SPLITVEC backupSplit = v->splitVector;
595
596                                 for (i = 0; i < len; i++)
597                                         if (v->spl_equiv[i + 1])
598                                         {
599                                                 map[newlen] = i + 1;
600                                                 newitup[newlen++] = itup[i];
601                                         }
602
603                                 Assert(newlen > 0);
604
605                                 backupSplit.spl_left = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
606                                 memcpy(backupSplit.spl_left, v->splitVector.spl_left, sizeof(OffsetNumber) * v->splitVector.spl_nleft);
607                                 backupSplit.spl_right = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
608                                 memcpy(backupSplit.spl_right, v->splitVector.spl_right, sizeof(OffsetNumber) * v->splitVector.spl_nright);
609
610                                 gistSplitByKey(r, page, newitup, newlen, giststate, v, entryvec, attno + 1);
611
612                                 /* merge result of subsplit */
613                                 for (i = 0; i < v->splitVector.spl_nleft; i++)
614                                         backupSplit.spl_left[backupSplit.spl_nleft++] = map[v->splitVector.spl_left[i] - 1];
615                                 for (i = 0; i < v->splitVector.spl_nright; i++)
616                                         backupSplit.spl_right[backupSplit.spl_nright++] = map[v->splitVector.spl_right[i] - 1];
617
618                                 v->splitVector = backupSplit;
619                                 /* reunion left and right datums */
620                                 gistunionsubkey(giststate, itup, v, attno);
621                         }
622                 }
623         }
624 }