]> granicus.if.org Git - postgresql/blob - src/backend/access/spgist/spgscan.c
Revert no-op changes to BufferGetPage()
[postgresql] / src / backend / access / spgist / spgscan.c
1 /*-------------------------------------------------------------------------
2  *
3  * spgscan.c
4  *        routines for scanning SP-GiST indexes
5  *
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/spgist/spgscan.c
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/relscan.h"
19 #include "access/spgist_private.h"
20 #include "miscadmin.h"
21 #include "storage/bufmgr.h"
22 #include "utils/datum.h"
23 #include "utils/memutils.h"
24 #include "utils/rel.h"
25
26
27 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
28                                                                  Datum leafValue, bool isnull, bool recheck);
29
30 typedef struct ScanStackEntry
31 {
32         Datum           reconstructedValue;             /* value reconstructed from parent */
33         void       *traversalValue; /* opclass-specific traverse value */
34         int                     level;                  /* level of items on this page */
35         ItemPointerData ptr;            /* block and offset to scan from */
36 } ScanStackEntry;
37
38
39 /* Free a ScanStackEntry */
40 static void
41 freeScanStackEntry(SpGistScanOpaque so, ScanStackEntry *stackEntry)
42 {
43         if (!so->state.attType.attbyval &&
44                 DatumGetPointer(stackEntry->reconstructedValue) != NULL)
45                 pfree(DatumGetPointer(stackEntry->reconstructedValue));
46         if (stackEntry->traversalValue)
47                 pfree(stackEntry->traversalValue);
48
49         pfree(stackEntry);
50 }
51
52 /* Free the entire stack */
53 static void
54 freeScanStack(SpGistScanOpaque so)
55 {
56         ListCell   *lc;
57
58         foreach(lc, so->scanStack)
59         {
60                 freeScanStackEntry(so, (ScanStackEntry *) lfirst(lc));
61         }
62         list_free(so->scanStack);
63         so->scanStack = NIL;
64 }
65
66 /*
67  * Initialize scanStack to search the root page, resetting
68  * any previously active scan
69  */
70 static void
71 resetSpGistScanOpaque(SpGistScanOpaque so)
72 {
73         ScanStackEntry *startEntry;
74
75         freeScanStack(so);
76
77         if (so->searchNulls)
78         {
79                 /* Stack a work item to scan the null index entries */
80                 startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
81                 ItemPointerSet(&startEntry->ptr, SPGIST_NULL_BLKNO, FirstOffsetNumber);
82                 so->scanStack = lappend(so->scanStack, startEntry);
83         }
84
85         if (so->searchNonNulls)
86         {
87                 /* Stack a work item to scan the non-null index entries */
88                 startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
89                 ItemPointerSet(&startEntry->ptr, SPGIST_ROOT_BLKNO, FirstOffsetNumber);
90                 so->scanStack = lappend(so->scanStack, startEntry);
91         }
92
93         if (so->want_itup)
94         {
95                 /* Must pfree IndexTuples to avoid memory leak */
96                 int                     i;
97
98                 for (i = 0; i < so->nPtrs; i++)
99                         pfree(so->indexTups[i]);
100         }
101         so->iPtr = so->nPtrs = 0;
102 }
103
104 /*
105  * Prepare scan keys in SpGistScanOpaque from caller-given scan keys
106  *
107  * Sets searchNulls, searchNonNulls, numberOfKeys, keyData fields of *so.
108  *
109  * The point here is to eliminate null-related considerations from what the
110  * opclass consistent functions need to deal with.  We assume all SPGiST-
111  * indexable operators are strict, so any null RHS value makes the scan
112  * condition unsatisfiable.  We also pull out any IS NULL/IS NOT NULL
113  * conditions; their effect is reflected into searchNulls/searchNonNulls.
114  */
115 static void
116 spgPrepareScanKeys(IndexScanDesc scan)
117 {
118         SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
119         bool            qual_ok;
120         bool            haveIsNull;
121         bool            haveNotNull;
122         int                     nkeys;
123         int                     i;
124
125         if (scan->numberOfKeys <= 0)
126         {
127                 /* If no quals, whole-index scan is required */
128                 so->searchNulls = true;
129                 so->searchNonNulls = true;
130                 so->numberOfKeys = 0;
131                 return;
132         }
133
134         /* Examine the given quals */
135         qual_ok = true;
136         haveIsNull = haveNotNull = false;
137         nkeys = 0;
138         for (i = 0; i < scan->numberOfKeys; i++)
139         {
140                 ScanKey         skey = &scan->keyData[i];
141
142                 if (skey->sk_flags & SK_SEARCHNULL)
143                         haveIsNull = true;
144                 else if (skey->sk_flags & SK_SEARCHNOTNULL)
145                         haveNotNull = true;
146                 else if (skey->sk_flags & SK_ISNULL)
147                 {
148                         /* ordinary qual with null argument - unsatisfiable */
149                         qual_ok = false;
150                         break;
151                 }
152                 else
153                 {
154                         /* ordinary qual, propagate into so->keyData */
155                         so->keyData[nkeys++] = *skey;
156                         /* this effectively creates a not-null requirement */
157                         haveNotNull = true;
158                 }
159         }
160
161         /* IS NULL in combination with something else is unsatisfiable */
162         if (haveIsNull && haveNotNull)
163                 qual_ok = false;
164
165         /* Emit results */
166         if (qual_ok)
167         {
168                 so->searchNulls = haveIsNull;
169                 so->searchNonNulls = haveNotNull;
170                 so->numberOfKeys = nkeys;
171         }
172         else
173         {
174                 so->searchNulls = false;
175                 so->searchNonNulls = false;
176                 so->numberOfKeys = 0;
177         }
178 }
179
180 IndexScanDesc
181 spgbeginscan(Relation rel, int keysz, int orderbysz)
182 {
183         IndexScanDesc scan;
184         SpGistScanOpaque so;
185
186         scan = RelationGetIndexScan(rel, keysz, 0);
187
188         so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData));
189         if (keysz > 0)
190                 so->keyData = (ScanKey) palloc(sizeof(ScanKeyData) * keysz);
191         else
192                 so->keyData = NULL;
193         initSpGistState(&so->state, scan->indexRelation);
194         so->tempCxt = AllocSetContextCreate(CurrentMemoryContext,
195                                                                                 "SP-GiST search temporary context",
196                                                                                 ALLOCSET_DEFAULT_MINSIZE,
197                                                                                 ALLOCSET_DEFAULT_INITSIZE,
198                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
199
200         /* Set up indexTupDesc and xs_itupdesc in case it's an index-only scan */
201         so->indexTupDesc = scan->xs_itupdesc = RelationGetDescr(rel);
202
203         scan->opaque = so;
204
205         return scan;
206 }
207
208 void
209 spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
210                   ScanKey orderbys, int norderbys)
211 {
212         SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
213
214         /* copy scankeys into local storage */
215         if (scankey && scan->numberOfKeys > 0)
216         {
217                 memmove(scan->keyData, scankey,
218                                 scan->numberOfKeys * sizeof(ScanKeyData));
219         }
220
221         /* preprocess scankeys, set up the representation in *so */
222         spgPrepareScanKeys(scan);
223
224         /* set up starting stack entries */
225         resetSpGistScanOpaque(so);
226 }
227
228 void
229 spgendscan(IndexScanDesc scan)
230 {
231         SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
232
233         MemoryContextDelete(so->tempCxt);
234 }
235
236 /*
237  * Test whether a leaf tuple satisfies all the scan keys
238  *
239  * *leafValue is set to the reconstructed datum, if provided
240  * *recheck is set true if any of the operators are lossy
241  */
242 static bool
243 spgLeafTest(Relation index, SpGistScanOpaque so,
244                         SpGistLeafTuple leafTuple, bool isnull,
245                         int level, Datum reconstructedValue,
246                         void *traversalValue,
247                         Datum *leafValue, bool *recheck)
248 {
249         bool            result;
250         Datum           leafDatum;
251         spgLeafConsistentIn in;
252         spgLeafConsistentOut out;
253         FmgrInfo   *procinfo;
254         MemoryContext oldCtx;
255
256         if (isnull)
257         {
258                 /* Should not have arrived on a nulls page unless nulls are wanted */
259                 Assert(so->searchNulls);
260                 *leafValue = (Datum) 0;
261                 *recheck = false;
262                 return true;
263         }
264
265         leafDatum = SGLTDATUM(leafTuple, &so->state);
266
267         /* use temp context for calling leaf_consistent */
268         oldCtx = MemoryContextSwitchTo(so->tempCxt);
269
270         in.scankeys = so->keyData;
271         in.nkeys = so->numberOfKeys;
272         in.reconstructedValue = reconstructedValue;
273         in.traversalValue = traversalValue;
274         in.level = level;
275         in.returnData = so->want_itup;
276         in.leafDatum = leafDatum;
277
278         out.leafValue = (Datum) 0;
279         out.recheck = false;
280
281         procinfo = index_getprocinfo(index, 1, SPGIST_LEAF_CONSISTENT_PROC);
282         result = DatumGetBool(FunctionCall2Coll(procinfo,
283                                                                                         index->rd_indcollation[0],
284                                                                                         PointerGetDatum(&in),
285                                                                                         PointerGetDatum(&out)));
286
287         *leafValue = out.leafValue;
288         *recheck = out.recheck;
289
290         MemoryContextSwitchTo(oldCtx);
291
292         return result;
293 }
294
295 /*
296  * Walk the tree and report all tuples passing the scan quals to the storeRes
297  * subroutine.
298  *
299  * If scanWholeIndex is true, we'll do just that.  If not, we'll stop at the
300  * next page boundary once we have reported at least one tuple.
301  */
302 static void
303 spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
304                 storeRes_func storeRes, Snapshot snapshot)
305 {
306         Buffer          buffer = InvalidBuffer;
307         bool            reportedSome = false;
308
309         while (scanWholeIndex || !reportedSome)
310         {
311                 ScanStackEntry *stackEntry;
312                 BlockNumber blkno;
313                 OffsetNumber offset;
314                 Page            page;
315                 bool            isnull;
316
317                 /* Pull next to-do item from the list */
318                 if (so->scanStack == NIL)
319                         break;                          /* there are no more pages to scan */
320
321                 stackEntry = (ScanStackEntry *) linitial(so->scanStack);
322                 so->scanStack = list_delete_first(so->scanStack);
323
324 redirect:
325                 /* Check for interrupts, just in case of infinite loop */
326                 CHECK_FOR_INTERRUPTS();
327
328                 blkno = ItemPointerGetBlockNumber(&stackEntry->ptr);
329                 offset = ItemPointerGetOffsetNumber(&stackEntry->ptr);
330
331                 if (buffer == InvalidBuffer)
332                 {
333                         buffer = ReadBuffer(index, blkno);
334                         LockBuffer(buffer, BUFFER_LOCK_SHARE);
335                 }
336                 else if (blkno != BufferGetBlockNumber(buffer))
337                 {
338                         UnlockReleaseBuffer(buffer);
339                         buffer = ReadBuffer(index, blkno);
340                         LockBuffer(buffer, BUFFER_LOCK_SHARE);
341                 }
342                 /* else new pointer points to the same page, no work needed */
343
344                 page = BufferGetPage(buffer);
345                 TestForOldSnapshot(snapshot, index, page);
346
347                 isnull = SpGistPageStoresNulls(page) ? true : false;
348
349                 if (SpGistPageIsLeaf(page))
350                 {
351                         SpGistLeafTuple leafTuple;
352                         OffsetNumber max = PageGetMaxOffsetNumber(page);
353                         Datum           leafValue = (Datum) 0;
354                         bool            recheck = false;
355
356                         if (SpGistBlockIsRoot(blkno))
357                         {
358                                 /* When root is a leaf, examine all its tuples */
359                                 for (offset = FirstOffsetNumber; offset <= max; offset++)
360                                 {
361                                         leafTuple = (SpGistLeafTuple)
362                                                 PageGetItem(page, PageGetItemId(page, offset));
363                                         if (leafTuple->tupstate != SPGIST_LIVE)
364                                         {
365                                                 /* all tuples on root should be live */
366                                                 elog(ERROR, "unexpected SPGiST tuple state: %d",
367                                                          leafTuple->tupstate);
368                                         }
369
370                                         Assert(ItemPointerIsValid(&leafTuple->heapPtr));
371                                         if (spgLeafTest(index, so,
372                                                                         leafTuple, isnull,
373                                                                         stackEntry->level,
374                                                                         stackEntry->reconstructedValue,
375                                                                         stackEntry->traversalValue,
376                                                                         &leafValue,
377                                                                         &recheck))
378                                         {
379                                                 storeRes(so, &leafTuple->heapPtr,
380                                                                  leafValue, isnull, recheck);
381                                                 reportedSome = true;
382                                         }
383                                 }
384                         }
385                         else
386                         {
387                                 /* Normal case: just examine the chain we arrived at */
388                                 while (offset != InvalidOffsetNumber)
389                                 {
390                                         Assert(offset >= FirstOffsetNumber && offset <= max);
391                                         leafTuple = (SpGistLeafTuple)
392                                                 PageGetItem(page, PageGetItemId(page, offset));
393                                         if (leafTuple->tupstate != SPGIST_LIVE)
394                                         {
395                                                 if (leafTuple->tupstate == SPGIST_REDIRECT)
396                                                 {
397                                                         /* redirection tuple should be first in chain */
398                                                         Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
399                                                         /* transfer attention to redirect point */
400                                                         stackEntry->ptr = ((SpGistDeadTuple) leafTuple)->pointer;
401                                                         Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
402                                                         goto redirect;
403                                                 }
404                                                 if (leafTuple->tupstate == SPGIST_DEAD)
405                                                 {
406                                                         /* dead tuple should be first in chain */
407                                                         Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
408                                                         /* No live entries on this page */
409                                                         Assert(leafTuple->nextOffset == InvalidOffsetNumber);
410                                                         break;
411                                                 }
412                                                 /* We should not arrive at a placeholder */
413                                                 elog(ERROR, "unexpected SPGiST tuple state: %d",
414                                                          leafTuple->tupstate);
415                                         }
416
417                                         Assert(ItemPointerIsValid(&leafTuple->heapPtr));
418                                         if (spgLeafTest(index, so,
419                                                                         leafTuple, isnull,
420                                                                         stackEntry->level,
421                                                                         stackEntry->reconstructedValue,
422                                                                         stackEntry->traversalValue,
423                                                                         &leafValue,
424                                                                         &recheck))
425                                         {
426                                                 storeRes(so, &leafTuple->heapPtr,
427                                                                  leafValue, isnull, recheck);
428                                                 reportedSome = true;
429                                         }
430
431                                         offset = leafTuple->nextOffset;
432                                 }
433                         }
434                 }
435                 else    /* page is inner */
436                 {
437                         SpGistInnerTuple innerTuple;
438                         spgInnerConsistentIn in;
439                         spgInnerConsistentOut out;
440                         FmgrInfo   *procinfo;
441                         SpGistNodeTuple *nodes;
442                         SpGistNodeTuple node;
443                         int                     i;
444                         MemoryContext oldCtx;
445
446                         innerTuple = (SpGistInnerTuple) PageGetItem(page,
447                                                                                                 PageGetItemId(page, offset));
448
449                         if (innerTuple->tupstate != SPGIST_LIVE)
450                         {
451                                 if (innerTuple->tupstate == SPGIST_REDIRECT)
452                                 {
453                                         /* transfer attention to redirect point */
454                                         stackEntry->ptr = ((SpGistDeadTuple) innerTuple)->pointer;
455                                         Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
456                                         goto redirect;
457                                 }
458                                 elog(ERROR, "unexpected SPGiST tuple state: %d",
459                                          innerTuple->tupstate);
460                         }
461
462                         /* use temp context for calling inner_consistent */
463                         oldCtx = MemoryContextSwitchTo(so->tempCxt);
464
465                         in.scankeys = so->keyData;
466                         in.nkeys = so->numberOfKeys;
467                         in.reconstructedValue = stackEntry->reconstructedValue;
468                         in.traversalMemoryContext = oldCtx;
469                         in.traversalValue = stackEntry->traversalValue;
470                         in.level = stackEntry->level;
471                         in.returnData = so->want_itup;
472                         in.allTheSame = innerTuple->allTheSame;
473                         in.hasPrefix = (innerTuple->prefixSize > 0);
474                         in.prefixDatum = SGITDATUM(innerTuple, &so->state);
475                         in.nNodes = innerTuple->nNodes;
476                         in.nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
477
478                         /* collect node pointers */
479                         nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * in.nNodes);
480                         SGITITERATE(innerTuple, i, node)
481                         {
482                                 nodes[i] = node;
483                         }
484
485                         memset(&out, 0, sizeof(out));
486
487                         if (!isnull)
488                         {
489                                 /* use user-defined inner consistent method */
490                                 procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC);
491                                 FunctionCall2Coll(procinfo,
492                                                                   index->rd_indcollation[0],
493                                                                   PointerGetDatum(&in),
494                                                                   PointerGetDatum(&out));
495                         }
496                         else
497                         {
498                                 /* force all children to be visited */
499                                 out.nNodes = in.nNodes;
500                                 out.nodeNumbers = (int *) palloc(sizeof(int) * in.nNodes);
501                                 for (i = 0; i < in.nNodes; i++)
502                                         out.nodeNumbers[i] = i;
503                         }
504
505                         MemoryContextSwitchTo(oldCtx);
506
507                         /* If allTheSame, they should all or none of 'em match */
508                         if (innerTuple->allTheSame)
509                                 if (out.nNodes != 0 && out.nNodes != in.nNodes)
510                                         elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
511
512                         for (i = 0; i < out.nNodes; i++)
513                         {
514                                 int                     nodeN = out.nodeNumbers[i];
515
516                                 Assert(nodeN >= 0 && nodeN < in.nNodes);
517                                 if (ItemPointerIsValid(&nodes[nodeN]->t_tid))
518                                 {
519                                         ScanStackEntry *newEntry;
520
521                                         /* Create new work item for this node */
522                                         newEntry = palloc(sizeof(ScanStackEntry));
523                                         newEntry->ptr = nodes[nodeN]->t_tid;
524                                         if (out.levelAdds)
525                                                 newEntry->level = stackEntry->level + out.levelAdds[i];
526                                         else
527                                                 newEntry->level = stackEntry->level;
528                                         /* Must copy value out of temp context */
529                                         if (out.reconstructedValues)
530                                                 newEntry->reconstructedValue =
531                                                         datumCopy(out.reconstructedValues[i],
532                                                                           so->state.attType.attbyval,
533                                                                           so->state.attType.attlen);
534                                         else
535                                                 newEntry->reconstructedValue = (Datum) 0;
536
537                                         /*
538                                          * Elements of out.traversalValues should be allocated in
539                                          * in.traversalMemoryContext, which is actually a long
540                                          * lived context of index scan.
541                                          */
542                                         newEntry->traversalValue = (out.traversalValues) ?
543                                                 out.traversalValues[i] : NULL;
544
545                                         so->scanStack = lcons(newEntry, so->scanStack);
546                                 }
547                         }
548                 }
549
550                 /* done with this scan stack entry */
551                 freeScanStackEntry(so, stackEntry);
552                 /* clear temp context before proceeding to the next one */
553                 MemoryContextReset(so->tempCxt);
554         }
555
556         if (buffer != InvalidBuffer)
557                 UnlockReleaseBuffer(buffer);
558 }
559
560 /* storeRes subroutine for getbitmap case */
561 static void
562 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
563                         Datum leafValue, bool isnull, bool recheck)
564 {
565         tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
566         so->ntids++;
567 }
568
569 int64
570 spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
571 {
572         SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
573
574         /* Copy want_itup to *so so we don't need to pass it around separately */
575         so->want_itup = false;
576
577         so->tbm = tbm;
578         so->ntids = 0;
579
580         spgWalk(scan->indexRelation, so, true, storeBitmap, scan->xs_snapshot);
581
582         return so->ntids;
583 }
584
585 /* storeRes subroutine for gettuple case */
586 static void
587 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
588                           Datum leafValue, bool isnull, bool recheck)
589 {
590         Assert(so->nPtrs < MaxIndexTuplesPerPage);
591         so->heapPtrs[so->nPtrs] = *heapPtr;
592         so->recheck[so->nPtrs] = recheck;
593         if (so->want_itup)
594         {
595                 /*
596                  * Reconstruct desired IndexTuple.  We have to copy the datum out of
597                  * the temp context anyway, so we may as well create the tuple here.
598                  */
599                 so->indexTups[so->nPtrs] = index_form_tuple(so->indexTupDesc,
600                                                                                                         &leafValue,
601                                                                                                         &isnull);
602         }
603         so->nPtrs++;
604 }
605
606 bool
607 spggettuple(IndexScanDesc scan, ScanDirection dir)
608 {
609         SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
610
611         if (dir != ForwardScanDirection)
612                 elog(ERROR, "SP-GiST only supports forward scan direction");
613
614         /* Copy want_itup to *so so we don't need to pass it around separately */
615         so->want_itup = scan->xs_want_itup;
616
617         for (;;)
618         {
619                 if (so->iPtr < so->nPtrs)
620                 {
621                         /* continuing to return tuples from a leaf page */
622                         scan->xs_ctup.t_self = so->heapPtrs[so->iPtr];
623                         scan->xs_recheck = so->recheck[so->iPtr];
624                         scan->xs_itup = so->indexTups[so->iPtr];
625                         so->iPtr++;
626                         return true;
627                 }
628
629                 if (so->want_itup)
630                 {
631                         /* Must pfree IndexTuples to avoid memory leak */
632                         int                     i;
633
634                         for (i = 0; i < so->nPtrs; i++)
635                                 pfree(so->indexTups[i]);
636                 }
637                 so->iPtr = so->nPtrs = 0;
638
639                 spgWalk(scan->indexRelation, so, false, storeGettuple,
640                                 scan->xs_snapshot);
641
642                 if (so->nPtrs == 0)
643                         break;                          /* must have completed scan */
644         }
645
646         return false;
647 }
648
649 bool
650 spgcanreturn(Relation index, int attno)
651 {
652         SpGistCache *cache;
653
654         /* We can do it if the opclass config function says so */
655         cache = spgGetCache(index);
656
657         return cache->config.canReturnData;
658 }