1 /*-------------------------------------------------------------------------
4 * implementation of compressed-suffix tree over text
7 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/spgist/spgtextproc.c
13 *-------------------------------------------------------------------------
17 #include "access/spgist.h"
18 #include "catalog/pg_type.h"
19 #include "mb/pg_wchar.h"
20 #include "utils/builtins.h"
21 #include "utils/datum.h"
22 #include "utils/pg_locale.h"
26 * In the worst case, a inner tuple in a text suffix tree could have as many
27 * as 256 nodes (one for each possible byte value). Each node can take 16
28 * bytes on MAXALIGN=8 machines. The inner tuple must fit on an index page
29 * of size BLCKSZ. Rather than assuming we know the exact amount of overhead
30 * imposed by page headers, tuple headers, etc, we leave 100 bytes for that
31 * (the actual overhead should be no more than 56 bytes at this writing, so
32 * there is slop in this number). The upshot is that the maximum safe prefix
35 #define SPGIST_MAX_PREFIX_LENGTH (BLCKSZ - 256 * 16 - 100)
37 /* Struct for sorting values in picksplit */
38 typedef struct spgNodePtr
47 spg_text_config(PG_FUNCTION_ARGS)
49 /* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
50 spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
52 cfg->prefixType = TEXTOID;
53 cfg->labelType = CHAROID;
54 cfg->longValuesOK = true; /* suffixing will shorten long values */
59 * Form a text datum from the given not-necessarily-null-terminated string,
60 * using short varlena header format if possible
63 formTextDatum(const char *data, int datalen)
67 p = (char *) palloc(datalen + VARHDRSZ);
69 if (datalen + VARHDRSZ_SHORT <= VARATT_SHORT_MAX)
71 SET_VARSIZE_SHORT(p, datalen + VARHDRSZ_SHORT);
73 memcpy(p + VARHDRSZ_SHORT, data, datalen);
77 SET_VARSIZE(p, datalen + VARHDRSZ);
78 memcpy(p + VARHDRSZ, data, datalen);
81 return PointerGetDatum(p);
85 * Find the length of the common prefix of a and b
88 commonPrefix(const char *a, const char *b, int lena, int lenb)
92 while (i < lena && i < lenb && *a == *b)
103 * Binary search an array of uint8 datums for a match to c
105 * On success, *i gets the match location; on failure, it gets where to insert
108 searchChar(Datum *nodeLabels, int nNodes, uint8 c, int *i)
113 while (StopLow < StopHigh)
115 int StopMiddle = (StopLow + StopHigh) >> 1;
116 uint8 middle = DatumGetUInt8(nodeLabels[StopMiddle]);
119 StopHigh = StopMiddle;
121 StopLow = StopMiddle + 1;
134 spg_text_choose(PG_FUNCTION_ARGS)
136 spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
137 spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
138 text *inText = DatumGetTextPP(in->datum);
139 char *inStr = VARDATA_ANY(inText);
140 int inSize = VARSIZE_ANY_EXHDR(inText);
141 uint8 nodeChar = '\0';
145 /* Check for prefix match, set nodeChar to first byte after prefix */
148 text *prefixText = DatumGetTextPP(in->prefixDatum);
149 char *prefixStr = VARDATA_ANY(prefixText);
150 int prefixSize = VARSIZE_ANY_EXHDR(prefixText);
152 commonLen = commonPrefix(inStr + in->level,
157 if (commonLen == prefixSize)
159 if (inSize - in->level > commonLen)
160 nodeChar = *(uint8 *) (inStr + in->level + commonLen);
166 /* Must split tuple because incoming value doesn't match prefix */
167 out->resultType = spgSplitTuple;
171 out->result.splitTuple.prefixHasPrefix = false;
175 out->result.splitTuple.prefixHasPrefix = true;
176 out->result.splitTuple.prefixPrefixDatum =
177 formTextDatum(prefixStr, commonLen);
179 out->result.splitTuple.nodeLabel =
180 UInt8GetDatum(*(prefixStr + commonLen));
182 if (prefixSize - commonLen == 1)
184 out->result.splitTuple.postfixHasPrefix = false;
188 out->result.splitTuple.postfixHasPrefix = true;
189 out->result.splitTuple.postfixPrefixDatum =
190 formTextDatum(prefixStr + commonLen + 1,
191 prefixSize - commonLen - 1);
197 else if (inSize > in->level)
199 nodeChar = *(uint8 *) (inStr + in->level);
206 /* Look up nodeChar in the node label array */
207 if (searchChar(in->nodeLabels, in->nNodes, nodeChar, &i))
210 * Descend to existing node. (If in->allTheSame, the core code will
211 * ignore our nodeN specification here, but that's OK. We still
212 * have to provide the correct levelAdd and restDatum values, and
213 * those are the same regardless of which node gets chosen by core.)
215 out->resultType = spgMatchNode;
216 out->result.matchNode.nodeN = i;
217 out->result.matchNode.levelAdd = commonLen + 1;
218 if (inSize - in->level - commonLen - 1 > 0)
219 out->result.matchNode.restDatum =
220 formTextDatum(inStr + in->level + commonLen + 1,
221 inSize - in->level - commonLen - 1);
223 out->result.matchNode.restDatum =
224 formTextDatum(NULL, 0);
226 else if (in->allTheSame)
229 * Can't use AddNode action, so split the tuple. The upper tuple
230 * has the same prefix as before and uses an empty node label for
231 * the lower tuple. The lower tuple has no prefix and the same
232 * node labels as the original tuple.
234 out->resultType = spgSplitTuple;
235 out->result.splitTuple.prefixHasPrefix = in->hasPrefix;
236 out->result.splitTuple.prefixPrefixDatum = in->prefixDatum;
237 out->result.splitTuple.nodeLabel = UInt8GetDatum('\0');
238 out->result.splitTuple.postfixHasPrefix = false;
242 /* Add a node for the not-previously-seen nodeChar value */
243 out->resultType = spgAddNode;
244 out->result.addNode.nodeLabel = UInt8GetDatum(nodeChar);
245 out->result.addNode.nodeN = i;
251 /* qsort comparator to sort spgNodePtr structs by "c" */
253 cmpNodePtr(const void *a, const void *b)
255 const spgNodePtr *aa = (const spgNodePtr *) a;
256 const spgNodePtr *bb = (const spgNodePtr *) b;
260 else if (aa->c > bb->c)
267 spg_text_picksplit(PG_FUNCTION_ARGS)
269 spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
270 spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
271 text *text0 = DatumGetTextPP(in->datums[0]);
276 /* Identify longest common prefix, if any */
277 commonLen = VARSIZE_ANY_EXHDR(text0);
278 for (i = 1; i < in->nTuples && commonLen > 0; i++)
280 text *texti = DatumGetTextPP(in->datums[i]);
281 int tmp = commonPrefix(VARDATA_ANY(text0),
283 VARSIZE_ANY_EXHDR(text0),
284 VARSIZE_ANY_EXHDR(texti));
291 * Limit the prefix length, if necessary, to ensure that the resulting
292 * inner tuple will fit on a page.
294 commonLen = Min(commonLen, SPGIST_MAX_PREFIX_LENGTH);
296 /* Set node prefix to be that string, if it's not empty */
299 out->hasPrefix = false;
303 out->hasPrefix = true;
304 out->prefixDatum = formTextDatum(VARDATA_ANY(text0), commonLen);
307 /* Extract the node label (first non-common byte) from each value */
308 nodes = (spgNodePtr *) palloc(sizeof(spgNodePtr) * in->nTuples);
310 for (i = 0; i < in->nTuples; i++)
312 text *texti = DatumGetTextPP(in->datums[i]);
314 if (commonLen < VARSIZE_ANY_EXHDR(texti))
315 nodes[i].c = *(uint8 *) (VARDATA_ANY(texti) + commonLen);
317 nodes[i].c = '\0'; /* use \0 if string is all common */
319 nodes[i].d = in->datums[i];
323 * Sort by label bytes so that we can group the values into nodes. This
324 * also ensures that the nodes are ordered by label value, allowing the
325 * use of binary search in searchChar.
327 qsort(nodes, in->nTuples, sizeof(*nodes), cmpNodePtr);
329 /* And emit results */
331 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * in->nTuples);
332 out->mapTuplesToNodes = (int *) palloc(sizeof(int) * in->nTuples);
333 out->leafTupleDatums = (Datum *) palloc(sizeof(Datum) * in->nTuples);
335 for (i = 0; i < in->nTuples; i++)
337 text *texti = DatumGetTextPP(nodes[i].d);
340 if (i == 0 || nodes[i].c != nodes[i - 1].c)
342 out->nodeLabels[out->nNodes] = UInt8GetDatum(nodes[i].c);
346 if (commonLen < VARSIZE_ANY_EXHDR(texti))
347 leafD = formTextDatum(VARDATA_ANY(texti) + commonLen + 1,
348 VARSIZE_ANY_EXHDR(texti) - commonLen - 1);
350 leafD = formTextDatum(NULL, 0);
352 out->leafTupleDatums[nodes[i].i] = leafD;
353 out->mapTuplesToNodes[nodes[i].i] = out->nNodes - 1;
360 spg_text_inner_consistent(PG_FUNCTION_ARGS)
362 spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
363 spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
364 StrategyNumber strategy = in->strategy;
368 text *reconstrText = NULL;
369 int maxReconstrLen = 0;
370 text *prefixText = NULL;
374 * If it's a collation-aware operator, but the collation is C, we can
375 * treat it as non-collation-aware.
378 lc_collate_is_c(PG_GET_COLLATION()))
381 inText = DatumGetTextPP(in->query);
382 inSize = VARSIZE_ANY_EXHDR(inText);
385 * Reconstruct values represented at this tuple, including parent data,
386 * prefix of this tuple if any, and the node label if any. in->level
387 * should be the length of the previously reconstructed value, and the
388 * number of bytes added here is prefixSize or prefixSize + 1.
390 * Note: we assume that in->reconstructedValue isn't toasted and doesn't
391 * have a short varlena header. This is okay because it must have been
392 * created by a previous invocation of this routine, and we always emit
393 * long-format reconstructed values.
395 Assert(in->level == 0 ? DatumGetPointer(in->reconstructedValue) == NULL :
396 VARSIZE_ANY_EXHDR(DatumGetPointer(in->reconstructedValue)) == in->level);
398 maxReconstrLen = in->level + 1;
401 prefixText = DatumGetTextPP(in->prefixDatum);
402 prefixSize = VARSIZE_ANY_EXHDR(prefixText);
403 maxReconstrLen += prefixSize;
406 reconstrText = palloc(VARHDRSZ + maxReconstrLen);
407 SET_VARSIZE(reconstrText, VARHDRSZ + maxReconstrLen);
410 memcpy(VARDATA(reconstrText),
411 VARDATA(DatumGetPointer(in->reconstructedValue)),
414 memcpy(((char *) VARDATA(reconstrText)) + in->level,
415 VARDATA_ANY(prefixText),
417 /* last byte of reconstrText will be filled in below */
420 * Scan the child nodes. For each one, complete the reconstructed value
421 * and see if it's consistent with the query. If so, emit an entry into
424 out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
425 out->levelAdds = (int *) palloc(sizeof(int) * in->nNodes);
426 out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
429 for (i = 0; i < in->nNodes; i++)
431 uint8 nodeChar = DatumGetUInt8(in->nodeLabels[i]);
436 /* If nodeChar is zero, don't include it in data */
437 if (nodeChar == '\0')
438 thisLen = maxReconstrLen - 1;
441 ((char *) VARDATA(reconstrText))[maxReconstrLen - 1] = nodeChar;
442 thisLen = maxReconstrLen;
445 r = memcmp(VARDATA(reconstrText), VARDATA_ANY(inText),
446 Min(inSize, thisLen));
450 case BTLessStrategyNumber:
451 case BTLessEqualStrategyNumber:
455 case BTEqualStrategyNumber:
456 if (r == 0 && inSize >= thisLen)
459 case BTGreaterEqualStrategyNumber:
460 case BTGreaterStrategyNumber:
464 case BTLessStrategyNumber + 10:
465 case BTLessEqualStrategyNumber + 10:
466 case BTGreaterEqualStrategyNumber + 10:
467 case BTGreaterStrategyNumber + 10:
469 * with non-C collation we need to traverse whole tree :-(
474 elog(ERROR, "unrecognized strategy number: %d",
481 out->nodeNumbers[out->nNodes] = i;
482 out->levelAdds[out->nNodes] = thisLen - in->level;
483 SET_VARSIZE(reconstrText, VARHDRSZ + thisLen);
484 out->reconstructedValues[out->nNodes] =
485 datumCopy(PointerGetDatum(reconstrText), false, -1);
494 spg_text_leaf_consistent(PG_FUNCTION_ARGS)
496 spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
497 spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
498 StrategyNumber strategy = in->strategy;
499 text *query = DatumGetTextPP(in->query);
500 int level = in->level;
502 *reconstrValue = NULL;
509 /* all tests are exact */
510 out->recheck = false;
512 leafValue = DatumGetTextPP(in->leafDatum);
514 if (DatumGetPointer(in->reconstructedValue))
515 reconstrValue = DatumGetTextP(in->reconstructedValue);
517 Assert(level == 0 ? reconstrValue == NULL :
518 VARSIZE_ANY_EXHDR(reconstrValue) == level);
520 fullLen = level + VARSIZE_ANY_EXHDR(leafValue);
522 queryLen = VARSIZE_ANY_EXHDR(query);
524 /* For equality, we needn't reconstruct fullValue if not same length */
525 if (strategy == BTEqualStrategyNumber && queryLen != fullLen)
526 PG_RETURN_BOOL(false);
528 /* Else, reconstruct the full string represented by this leaf tuple */
529 if (VARSIZE_ANY_EXHDR(leafValue) == 0 && level > 0)
531 fullValue = VARDATA(reconstrValue);
535 fullValue = palloc(fullLen);
537 memcpy(fullValue, VARDATA(reconstrValue), level);
538 if (VARSIZE_ANY_EXHDR(leafValue) > 0)
539 memcpy(fullValue + level, VARDATA_ANY(leafValue),
540 VARSIZE_ANY_EXHDR(leafValue));
543 /* Run the appropriate type of comparison */
546 /* Collation-aware comparison */
549 /* If asserts are enabled, verify encoding of reconstructed string */
550 Assert(pg_verifymbstr(fullValue, fullLen, false));
552 r = varstr_cmp(fullValue, Min(queryLen, fullLen),
553 VARDATA_ANY(query), Min(queryLen, fullLen),
558 /* Non-collation-aware comparison */
559 r = memcmp(fullValue, VARDATA_ANY(query), Min(queryLen, fullLen));
564 if (queryLen > fullLen)
566 else if (queryLen < fullLen)
572 case BTLessStrategyNumber:
575 case BTLessEqualStrategyNumber:
578 case BTEqualStrategyNumber:
581 case BTGreaterEqualStrategyNumber:
584 case BTGreaterStrategyNumber:
588 elog(ERROR, "unrecognized strategy number: %d", in->strategy);