]> granicus.if.org Git - postgresql/blob - contrib/bloom/blscan.c
0744d74de7531257612b8ba9184f66ef72580bd2
[postgresql] / contrib / bloom / blscan.c
1 /*-------------------------------------------------------------------------
2  *
3  * blscan.c
4  *              Bloom index scan functions.
5  *
6  * Copyright (c) 2016-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *        contrib/bloom/blscan.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14
15 #include "access/relscan.h"
16 #include "pgstat.h"
17 #include "miscadmin.h"
18 #include "storage/bufmgr.h"
19 #include "storage/lmgr.h"
20 #include "utils/memutils.h"
21 #include "utils/rel.h"
22
23 #include "bloom.h"
24
25 /*
26  * Begin scan of bloom index.
27  */
28 IndexScanDesc
29 blbeginscan(Relation r, int nkeys, int norderbys)
30 {
31         IndexScanDesc scan;
32         BloomScanOpaque so;
33
34         scan = RelationGetIndexScan(r, nkeys, norderbys);
35
36         so = (BloomScanOpaque) palloc(sizeof(BloomScanOpaqueData));
37         initBloomState(&so->state, scan->indexRelation);
38         so->sign = NULL;
39
40         scan->opaque = so;
41
42         return scan;
43 }
44
45 /*
46  * Rescan a bloom index.
47  */
48 void
49 blrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
50                  ScanKey orderbys, int norderbys)
51 {
52         BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
53
54         if (so->sign)
55                 pfree(so->sign);
56         so->sign = NULL;
57
58         if (scankey && scan->numberOfKeys > 0)
59         {
60                 memmove(scan->keyData, scankey,
61                                 scan->numberOfKeys * sizeof(ScanKeyData));
62         }
63 }
64
65 /*
66  * End scan of bloom index.
67  */
68 void
69 blendscan(IndexScanDesc scan)
70 {
71         BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
72
73         if (so->sign)
74                 pfree(so->sign);
75         so->sign = NULL;
76 }
77
78 /*
79  * Insert all matching tuples into to a bitmap.
80  */
81 int64
82 blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
83 {
84         int64           ntids = 0;
85         BlockNumber blkno = BLOOM_HEAD_BLKNO,
86                                 npages;
87         int                     i;
88         BufferAccessStrategy bas;
89         BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
90
91         if (so->sign == NULL)
92         {
93                 /* New search: have to calculate search signature */
94                 ScanKey         skey = scan->keyData;
95
96                 so->sign = palloc0(sizeof(BloomSignatureWord) * so->state.opts.bloomLength);
97
98                 for (i = 0; i < scan->numberOfKeys; i++)
99                 {
100                         /*
101                          * Assume bloom-indexable operators to be strict, so nothing could
102                          * be found for NULL key.
103                          */
104                         if (skey->sk_flags & SK_ISNULL)
105                         {
106                                 pfree(so->sign);
107                                 so->sign = NULL;
108                                 return 0;
109                         }
110
111                         /* Add next value to the signature */
112                         signValue(&so->state, so->sign, skey->sk_argument,
113                                           skey->sk_attno - 1);
114
115                         skey++;
116                 }
117         }
118
119         /*
120          * We're going to read the whole index. This is why we use appropriate
121          * buffer access strategy.
122          */
123         bas = GetAccessStrategy(BAS_BULKREAD);
124         npages = RelationGetNumberOfBlocks(scan->indexRelation);
125
126         for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
127         {
128                 Buffer          buffer;
129                 Page            page;
130
131                 buffer = ReadBufferExtended(scan->indexRelation, MAIN_FORKNUM,
132                                                                         blkno, RBM_NORMAL, bas);
133
134                 LockBuffer(buffer, BUFFER_LOCK_SHARE);
135                 page = BufferGetPage(buffer);
136                 TestForOldSnapshot(scan->xs_snapshot, scan->indexRelation, page);
137
138                 if (!PageIsNew(page) && !BloomPageIsDeleted(page))
139                 {
140                         OffsetNumber offset,
141                                                 maxOffset = BloomPageGetMaxOffset(page);
142
143                         for (offset = 1; offset <= maxOffset; offset++)
144                         {
145                                 BloomTuple *itup = BloomPageGetTuple(&so->state, page, offset);
146                                 bool            res = true;
147
148                                 /* Check index signature with scan signature */
149                                 for (i = 0; i < so->state.opts.bloomLength; i++)
150                                 {
151                                         if ((itup->sign[i] & so->sign[i]) != so->sign[i])
152                                         {
153                                                 res = false;
154                                                 break;
155                                         }
156                                 }
157
158                                 /* Add matching tuples to bitmap */
159                                 if (res)
160                                 {
161                                         tbm_add_tuples(tbm, &itup->heapPtr, 1, true);
162                                         ntids++;
163                                 }
164                         }
165                 }
166
167                 UnlockReleaseBuffer(buffer);
168                 CHECK_FOR_INTERRUPTS();
169         }
170         FreeAccessStrategy(bas);
171
172         return ntids;
173 }