]> granicus.if.org Git - postgresql/blob - src/backend/access/spgist/spgtextproc.c
Add SP-GiST (space-partitioned GiST) index access method.
[postgresql] / src / backend / access / spgist / spgtextproc.c
1 /*-------------------------------------------------------------------------
2  *
3  * spgtextproc.c
4  *        implementation of compressed-suffix tree over text
5  *
6  *
7  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/spgist/spgtextproc.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/spgist.h"
18 #include "catalog/pg_type.h"
19 #include "mb/pg_wchar.h"
20 #include "utils/builtins.h"
21 #include "utils/datum.h"
22 #include "utils/pg_locale.h"
23
24
25 /*
26  * In the worst case, a inner tuple in a text suffix tree could have as many
27  * as 256 nodes (one for each possible byte value).  Each node can take 16
28  * bytes on MAXALIGN=8 machines.  The inner tuple must fit on an index page
29  * of size BLCKSZ.  Rather than assuming we know the exact amount of overhead
30  * imposed by page headers, tuple headers, etc, we leave 100 bytes for that
31  * (the actual overhead should be no more than 56 bytes at this writing, so
32  * there is slop in this number).  The upshot is that the maximum safe prefix
33  * length is this:
34  */
35 #define SPGIST_MAX_PREFIX_LENGTH        (BLCKSZ - 256 * 16 - 100)
36
37 /* Struct for sorting values in picksplit */
38 typedef struct spgNodePtr
39 {
40         Datum           d;
41         int                     i;
42         uint8           c;
43 } spgNodePtr;
44
45
46 Datum
47 spg_text_config(PG_FUNCTION_ARGS)
48 {
49         /* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
50         spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
51
52         cfg->prefixType = TEXTOID;
53         cfg->labelType = CHAROID;
54         cfg->longValuesOK = true;       /* suffixing will shorten long values */
55         PG_RETURN_VOID();
56 }
57
58 /*
59  * Form a text datum from the given not-necessarily-null-terminated string,
60  * using short varlena header format if possible
61  */
62 static Datum
63 formTextDatum(const char *data, int datalen)
64 {
65         char       *p;
66
67         p = (char *) palloc(datalen + VARHDRSZ);
68
69         if (datalen + VARHDRSZ_SHORT <= VARATT_SHORT_MAX)
70         {
71                 SET_VARSIZE_SHORT(p, datalen + VARHDRSZ_SHORT);
72                 if (datalen)
73                         memcpy(p + VARHDRSZ_SHORT, data, datalen);
74         }
75         else
76         {
77                 SET_VARSIZE(p, datalen + VARHDRSZ);
78                 memcpy(p + VARHDRSZ, data, datalen);
79         }
80
81         return PointerGetDatum(p);
82 }
83
84 /*
85  * Find the length of the common prefix of a and b
86  */
87 static int
88 commonPrefix(const char *a, const char *b, int lena, int lenb)
89 {
90         int                     i = 0;
91
92         while (i < lena && i < lenb && *a == *b)
93         {
94                 a++;
95                 b++;
96                 i++;
97         }
98
99         return i;
100 }
101
102 /*
103  * Binary search an array of uint8 datums for a match to c
104  *
105  * On success, *i gets the match location; on failure, it gets where to insert
106  */
107 static bool
108 searchChar(Datum *nodeLabels, int nNodes, uint8 c, int *i)
109 {
110         int                     StopLow = 0,
111                                 StopHigh = nNodes;
112
113         while (StopLow < StopHigh)
114         {
115                 int                     StopMiddle = (StopLow + StopHigh) >> 1;
116                 uint8           middle = DatumGetUInt8(nodeLabels[StopMiddle]);
117
118                 if (c < middle)
119                         StopHigh = StopMiddle;
120                 else if (c > middle)
121                         StopLow = StopMiddle + 1;
122                 else
123                 {
124                         *i = StopMiddle;
125                         return true;
126                 }
127         }
128
129         *i = StopHigh;
130         return false;
131 }
132
133 Datum
134 spg_text_choose(PG_FUNCTION_ARGS)
135 {
136         spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
137         spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
138         text       *inText = DatumGetTextPP(in->datum);
139         char       *inStr = VARDATA_ANY(inText);
140         int                     inSize = VARSIZE_ANY_EXHDR(inText);
141         uint8           nodeChar = '\0';
142         int                     i = 0;
143         int                     commonLen = 0;
144
145         /* Check for prefix match, set nodeChar to first byte after prefix */
146         if (in->hasPrefix)
147         {
148                 text       *prefixText = DatumGetTextPP(in->prefixDatum);
149                 char       *prefixStr = VARDATA_ANY(prefixText);
150                 int                     prefixSize = VARSIZE_ANY_EXHDR(prefixText);
151
152                 commonLen = commonPrefix(inStr + in->level,
153                                                                  prefixStr,
154                                                                  inSize - in->level,
155                                                                  prefixSize);
156
157                 if (commonLen == prefixSize)
158                 {
159                         if (inSize - in->level > commonLen)
160                                 nodeChar = *(uint8 *) (inStr + in->level + commonLen);
161                         else
162                                 nodeChar = '\0';
163                 }
164                 else
165                 {
166                         /* Must split tuple because incoming value doesn't match prefix */
167                         out->resultType = spgSplitTuple;
168
169                         if (commonLen == 0)
170                         {
171                                 out->result.splitTuple.prefixHasPrefix = false;
172                         }
173                         else
174                         {
175                                 out->result.splitTuple.prefixHasPrefix = true;
176                                 out->result.splitTuple.prefixPrefixDatum =
177                                         formTextDatum(prefixStr, commonLen);
178                         }
179                         out->result.splitTuple.nodeLabel =
180                                 UInt8GetDatum(*(prefixStr + commonLen));
181
182                         if (prefixSize - commonLen == 1)
183                         {
184                                 out->result.splitTuple.postfixHasPrefix = false;
185                         }
186                         else
187                         {
188                                 out->result.splitTuple.postfixHasPrefix = true;
189                                 out->result.splitTuple.postfixPrefixDatum =
190                                         formTextDatum(prefixStr + commonLen + 1,
191                                                                   prefixSize - commonLen - 1);
192                         }
193
194                         PG_RETURN_VOID();
195                 }
196         }
197         else if (inSize > in->level)
198         {
199                 nodeChar = *(uint8 *) (inStr + in->level);
200         }
201         else
202         {
203                 nodeChar = '\0';
204         }
205
206         /* Look up nodeChar in the node label array */
207         if (searchChar(in->nodeLabels, in->nNodes, nodeChar, &i))
208         {
209                 /*
210                  * Descend to existing node.  (If in->allTheSame, the core code will
211                  * ignore our nodeN specification here, but that's OK.  We still
212                  * have to provide the correct levelAdd and restDatum values, and
213                  * those are the same regardless of which node gets chosen by core.)
214                  */
215                 out->resultType = spgMatchNode;
216                 out->result.matchNode.nodeN = i;
217                 out->result.matchNode.levelAdd = commonLen + 1;
218                 if (inSize - in->level - commonLen - 1 > 0)
219                         out->result.matchNode.restDatum =
220                                 formTextDatum(inStr + in->level + commonLen + 1,
221                                                           inSize - in->level - commonLen - 1);
222                 else
223                         out->result.matchNode.restDatum =
224                                 formTextDatum(NULL, 0);
225         }
226         else if (in->allTheSame)
227         {
228                 /*
229                  * Can't use AddNode action, so split the tuple.  The upper tuple
230                  * has the same prefix as before and uses an empty node label for
231                  * the lower tuple.  The lower tuple has no prefix and the same
232                  * node labels as the original tuple.
233                  */
234                 out->resultType = spgSplitTuple;
235                 out->result.splitTuple.prefixHasPrefix = in->hasPrefix;
236                 out->result.splitTuple.prefixPrefixDatum = in->prefixDatum;
237                 out->result.splitTuple.nodeLabel = UInt8GetDatum('\0');
238                 out->result.splitTuple.postfixHasPrefix = false;
239         }
240         else
241         {
242                 /* Add a node for the not-previously-seen nodeChar value */
243                 out->resultType = spgAddNode;
244                 out->result.addNode.nodeLabel = UInt8GetDatum(nodeChar);
245                 out->result.addNode.nodeN = i;
246         }
247
248         PG_RETURN_VOID();
249 }
250
251 /* qsort comparator to sort spgNodePtr structs by "c" */
252 static int
253 cmpNodePtr(const void *a, const void *b)
254 {
255         const spgNodePtr *aa = (const spgNodePtr *) a;
256         const spgNodePtr *bb = (const spgNodePtr *) b;
257
258         if (aa->c == bb->c)
259                 return 0;
260         else if (aa->c > bb->c)
261                 return 1;
262         else
263                 return -1;
264 }
265
266 Datum
267 spg_text_picksplit(PG_FUNCTION_ARGS)
268 {
269         spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
270         spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
271         text       *text0 = DatumGetTextPP(in->datums[0]);
272         int                     i,
273                                 commonLen;
274         spgNodePtr *nodes;
275
276         /* Identify longest common prefix, if any */
277         commonLen = VARSIZE_ANY_EXHDR(text0);
278         for (i = 1; i < in->nTuples && commonLen > 0; i++)
279         {
280                 text       *texti = DatumGetTextPP(in->datums[i]);
281                 int                     tmp = commonPrefix(VARDATA_ANY(text0),
282                                                                            VARDATA_ANY(texti),
283                                                                            VARSIZE_ANY_EXHDR(text0),
284                                                                            VARSIZE_ANY_EXHDR(texti));
285
286                 if (tmp < commonLen)
287                         commonLen = tmp;
288         }
289
290         /*
291          * Limit the prefix length, if necessary, to ensure that the resulting
292          * inner tuple will fit on a page.
293          */
294         commonLen = Min(commonLen, SPGIST_MAX_PREFIX_LENGTH);
295
296         /* Set node prefix to be that string, if it's not empty */
297         if (commonLen == 0)
298         {
299                 out->hasPrefix = false;
300         }
301         else
302         {
303                 out->hasPrefix = true;
304                 out->prefixDatum = formTextDatum(VARDATA_ANY(text0), commonLen);
305         }
306
307         /* Extract the node label (first non-common byte) from each value */
308         nodes = (spgNodePtr *) palloc(sizeof(spgNodePtr) * in->nTuples);
309
310         for (i = 0; i < in->nTuples; i++)
311         {
312                 text       *texti = DatumGetTextPP(in->datums[i]);
313
314                 if (commonLen < VARSIZE_ANY_EXHDR(texti))
315                         nodes[i].c = *(uint8 *) (VARDATA_ANY(texti) + commonLen);
316                 else
317                         nodes[i].c = '\0';                      /* use \0 if string is all common */
318                 nodes[i].i = i;
319                 nodes[i].d = in->datums[i];
320         }
321
322         /*
323          * Sort by label bytes so that we can group the values into nodes.  This
324          * also ensures that the nodes are ordered by label value, allowing the
325          * use of binary search in searchChar.
326          */
327         qsort(nodes, in->nTuples, sizeof(*nodes), cmpNodePtr);
328
329         /* And emit results */
330         out->nNodes = 0;
331         out->nodeLabels = (Datum *) palloc(sizeof(Datum) * in->nTuples);
332         out->mapTuplesToNodes = (int *) palloc(sizeof(int) * in->nTuples);
333         out->leafTupleDatums = (Datum *) palloc(sizeof(Datum) * in->nTuples);
334
335         for (i = 0; i < in->nTuples; i++)
336         {
337                 text       *texti = DatumGetTextPP(nodes[i].d);
338                 Datum           leafD;
339
340                 if (i == 0 || nodes[i].c != nodes[i - 1].c)
341                 {
342                         out->nodeLabels[out->nNodes] = UInt8GetDatum(nodes[i].c);
343                         out->nNodes++;
344                 }
345
346                 if (commonLen < VARSIZE_ANY_EXHDR(texti))
347                         leafD = formTextDatum(VARDATA_ANY(texti) + commonLen + 1,
348                                                                   VARSIZE_ANY_EXHDR(texti) - commonLen - 1);
349                 else
350                         leafD = formTextDatum(NULL, 0);
351
352                 out->leafTupleDatums[nodes[i].i] = leafD;
353                 out->mapTuplesToNodes[nodes[i].i] = out->nNodes - 1;
354         }
355
356         PG_RETURN_VOID();
357 }
358
359 Datum
360 spg_text_inner_consistent(PG_FUNCTION_ARGS)
361 {
362         spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
363         spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
364         StrategyNumber strategy = in->strategy;
365         text       *inText;
366         int                     inSize;
367         int                     i;
368         text       *reconstrText = NULL;
369         int                     maxReconstrLen = 0;
370         text       *prefixText = NULL;
371         int                     prefixSize = 0;
372
373         /*
374          * If it's a collation-aware operator, but the collation is C, we can
375          * treat it as non-collation-aware.
376          */
377         if (strategy > 10 &&
378                 lc_collate_is_c(PG_GET_COLLATION()))
379                 strategy -= 10;
380
381         inText = DatumGetTextPP(in->query);
382         inSize = VARSIZE_ANY_EXHDR(inText);
383
384         /*
385          * Reconstruct values represented at this tuple, including parent data,
386          * prefix of this tuple if any, and the node label if any.  in->level
387          * should be the length of the previously reconstructed value, and the
388          * number of bytes added here is prefixSize or prefixSize + 1.
389          *
390          * Note: we assume that in->reconstructedValue isn't toasted and doesn't
391          * have a short varlena header.  This is okay because it must have been
392          * created by a previous invocation of this routine, and we always emit
393          * long-format reconstructed values.
394          */
395         Assert(in->level == 0 ? DatumGetPointer(in->reconstructedValue) == NULL :
396                    VARSIZE_ANY_EXHDR(DatumGetPointer(in->reconstructedValue)) == in->level);
397
398         maxReconstrLen = in->level + 1;
399         if (in->hasPrefix)
400         {
401                 prefixText = DatumGetTextPP(in->prefixDatum);
402                 prefixSize = VARSIZE_ANY_EXHDR(prefixText);
403                 maxReconstrLen += prefixSize;
404         }
405
406         reconstrText = palloc(VARHDRSZ + maxReconstrLen);
407         SET_VARSIZE(reconstrText, VARHDRSZ + maxReconstrLen);
408
409         if (in->level)
410                 memcpy(VARDATA(reconstrText),
411                            VARDATA(DatumGetPointer(in->reconstructedValue)),
412                            in->level);
413         if (prefixSize)
414                 memcpy(((char *) VARDATA(reconstrText)) + in->level,
415                            VARDATA_ANY(prefixText),
416                            prefixSize);
417         /* last byte of reconstrText will be filled in below */
418
419         /*
420          * Scan the child nodes.  For each one, complete the reconstructed value
421          * and see if it's consistent with the query.  If so, emit an entry into
422          * the output arrays.
423          */
424         out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
425         out->levelAdds = (int *) palloc(sizeof(int) * in->nNodes);
426         out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
427         out->nNodes = 0;
428
429         for (i = 0; i < in->nNodes; i++)
430         {
431                 uint8           nodeChar = DatumGetUInt8(in->nodeLabels[i]);
432                 int                     thisLen;
433                 int                     r;
434                 bool            res = false;
435
436                 /* If nodeChar is zero, don't include it in data */
437                 if (nodeChar == '\0')
438                         thisLen = maxReconstrLen - 1;
439                 else
440                 {
441                         ((char *) VARDATA(reconstrText))[maxReconstrLen - 1] = nodeChar;
442                         thisLen = maxReconstrLen;
443                 }
444
445                 r = memcmp(VARDATA(reconstrText), VARDATA_ANY(inText),
446                                    Min(inSize, thisLen));
447
448                 switch (strategy)
449                 {
450                         case BTLessStrategyNumber:
451                         case BTLessEqualStrategyNumber:
452                                 if (r <= 0)
453                                         res = true;
454                                 break;
455                         case BTEqualStrategyNumber:
456                                 if (r == 0 && inSize >= thisLen)
457                                         res = true;
458                                 break;
459                         case BTGreaterEqualStrategyNumber:
460                         case BTGreaterStrategyNumber:
461                                 if (r >= 0)
462                                         res = true;
463                                 break;
464                         case BTLessStrategyNumber + 10:
465                         case BTLessEqualStrategyNumber + 10:
466                         case BTGreaterEqualStrategyNumber + 10:
467                         case BTGreaterStrategyNumber + 10:
468                                 /*
469                                  * with non-C collation we need to traverse whole tree :-(
470                                  */
471                                 res = true;
472                                 break;
473                         default:
474                                 elog(ERROR, "unrecognized strategy number: %d",
475                                          in->strategy);
476                                 break;
477                 }
478
479                 if (res)
480                 {
481                         out->nodeNumbers[out->nNodes] = i;
482                         out->levelAdds[out->nNodes] = thisLen - in->level;
483                         SET_VARSIZE(reconstrText, VARHDRSZ + thisLen);
484                         out->reconstructedValues[out->nNodes] =
485                                 datumCopy(PointerGetDatum(reconstrText), false, -1);
486                         out->nNodes++;
487                 }
488         }
489
490         PG_RETURN_VOID();
491 }
492
493 Datum
494 spg_text_leaf_consistent(PG_FUNCTION_ARGS)
495 {
496         spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
497         spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
498         StrategyNumber strategy = in->strategy;
499         text       *query = DatumGetTextPP(in->query);
500         int                     level = in->level;
501         text       *leafValue,
502                            *reconstrValue = NULL;
503         char       *fullValue;
504         int                     fullLen;
505         int                     queryLen;
506         int                     r;
507         bool            res;
508
509         /* all tests are exact */
510         out->recheck = false;
511
512         leafValue = DatumGetTextPP(in->leafDatum);
513
514         if (DatumGetPointer(in->reconstructedValue))
515                 reconstrValue = DatumGetTextP(in->reconstructedValue);
516
517         Assert(level == 0 ? reconstrValue == NULL :
518                    VARSIZE_ANY_EXHDR(reconstrValue) == level);
519
520         fullLen = level + VARSIZE_ANY_EXHDR(leafValue);
521
522         queryLen = VARSIZE_ANY_EXHDR(query);
523
524         /* For equality, we needn't reconstruct fullValue if not same length */
525         if (strategy == BTEqualStrategyNumber && queryLen != fullLen)
526                 PG_RETURN_BOOL(false);
527
528         /* Else, reconstruct the full string represented by this leaf tuple */
529         if (VARSIZE_ANY_EXHDR(leafValue) == 0 && level > 0)
530         {
531                 fullValue = VARDATA(reconstrValue);
532         }
533         else
534         {
535                 fullValue = palloc(fullLen);
536                 if (level)
537                         memcpy(fullValue, VARDATA(reconstrValue), level);
538                 if (VARSIZE_ANY_EXHDR(leafValue) > 0)
539                         memcpy(fullValue + level, VARDATA_ANY(leafValue),
540                                    VARSIZE_ANY_EXHDR(leafValue));
541         }
542
543         /* Run the appropriate type of comparison */
544         if (strategy > 10)
545         {
546                 /* Collation-aware comparison */
547                 strategy -= 10;
548
549                 /* If asserts are enabled, verify encoding of reconstructed string */
550                 Assert(pg_verifymbstr(fullValue, fullLen, false));
551
552                 r = varstr_cmp(fullValue, Min(queryLen, fullLen),
553                                            VARDATA_ANY(query), Min(queryLen, fullLen),
554                                            PG_GET_COLLATION());
555         }
556         else
557         {
558                 /* Non-collation-aware comparison */
559                 r = memcmp(fullValue, VARDATA_ANY(query), Min(queryLen, fullLen));
560         }
561
562         if (r == 0)
563         {
564                 if (queryLen > fullLen)
565                         r = -1;
566                 else if (queryLen < fullLen)
567                         r = 1;
568         }
569
570         switch (strategy)
571         {
572                 case BTLessStrategyNumber:
573                         res = (r < 0);
574                         break;
575                 case BTLessEqualStrategyNumber:
576                         res = (r <= 0);
577                         break;
578                 case BTEqualStrategyNumber:
579                         res = (r == 0);
580                         break;
581                 case BTGreaterEqualStrategyNumber:
582                         res = (r >= 0);
583                         break;
584                 case BTGreaterStrategyNumber:
585                         res = (r > 0);
586                         break;
587                 default:
588                         elog(ERROR, "unrecognized strategy number: %d", in->strategy);
589                         res = false;
590                         break;
591         }
592
593         PG_RETURN_BOOL(res);
594 }