]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/ginutil.c
Implement "fastupdate" support for GIN indexes, in which we try to accumulate
[postgresql] / src / backend / access / gin / ginutil.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginutil.c
4  *        utilities routines for the postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      $PostgreSQL: pgsql/src/backend/access/gin/ginutil.c,v 1.21 2009/03/24 20:17:11 tgl Exp $
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16 #include "access/genam.h"
17 #include "access/gin.h"
18 #include "access/reloptions.h"
19 #include "catalog/pg_type.h" 
20 #include "storage/bufmgr.h"
21 #include "storage/freespace.h"
22 #include "storage/indexfsm.h"
23 #include "storage/lmgr.h"
24
25 void
26 initGinState(GinState *state, Relation index)
27 {
28         int i;
29
30         state->origTupdesc = index->rd_att;
31
32         state->oneCol = (index->rd_att->natts == 1) ? true : false;
33
34         for(i=0;i<index->rd_att->natts;i++)
35         {
36                 state->tupdesc[i] = CreateTemplateTupleDesc(2,false);
37
38                 TupleDescInitEntry( state->tupdesc[i], (AttrNumber) 1, NULL,
39                                                         INT2OID, -1, 0);
40                 TupleDescInitEntry( state->tupdesc[i], (AttrNumber) 2, NULL,
41                                                         index->rd_att->attrs[i]->atttypid,
42                                                         index->rd_att->attrs[i]->atttypmod,
43                                                         index->rd_att->attrs[i]->attndims
44                                                         );
45
46                 fmgr_info_copy(&(state->compareFn[i]),
47                                                 index_getprocinfo(index, i+1, GIN_COMPARE_PROC),
48                                                 CurrentMemoryContext);
49                 fmgr_info_copy(&(state->extractValueFn[i]),
50                                                 index_getprocinfo(index, i+1, GIN_EXTRACTVALUE_PROC),
51                                                 CurrentMemoryContext);
52                 fmgr_info_copy(&(state->extractQueryFn[i]),
53                                                 index_getprocinfo(index, i+1, GIN_EXTRACTQUERY_PROC),
54                                                 CurrentMemoryContext);
55                 fmgr_info_copy(&(state->consistentFn[i]),
56                                                 index_getprocinfo(index, i+1, GIN_CONSISTENT_PROC),
57                                                 CurrentMemoryContext);
58
59                 /*
60                  * Check opclass capability to do partial match.
61                  */
62                 if ( index_getprocid(index, i+1, GIN_COMPARE_PARTIAL_PROC) != InvalidOid )
63                 {
64                         fmgr_info_copy(&(state->comparePartialFn[i]),
65                                                    index_getprocinfo(index, i+1, GIN_COMPARE_PARTIAL_PROC),
66                                                    CurrentMemoryContext);
67
68                         state->canPartialMatch[i] = true;
69                 }
70                 else
71                 {
72                         state->canPartialMatch[i] = false;
73                 }
74         }
75 }
76
77 /*
78  * Extract attribute (column) number of stored entry from GIN tuple
79  */
80 OffsetNumber
81 gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
82 {
83         OffsetNumber colN = FirstOffsetNumber;
84
85         if ( !ginstate->oneCol )
86         {
87                 Datum   res;
88                 bool    isnull;
89
90                 /*
91                  * First attribute is always int16, so we can safely use any
92                  * tuple descriptor to obtain first attribute of tuple
93                  */
94                 res = index_getattr(tuple, FirstOffsetNumber, ginstate->tupdesc[0],
95                                                         &isnull);
96                 Assert(!isnull);
97
98                 colN = DatumGetUInt16(res);
99                 Assert( colN >= FirstOffsetNumber && colN <= ginstate->origTupdesc->natts );
100         }
101
102         return colN;
103 }
104
105 /*
106  * Extract stored datum from GIN tuple
107  */
108 Datum
109 gin_index_getattr(GinState *ginstate, IndexTuple tuple)
110 {
111         bool    isnull;
112         Datum   res;
113
114         if ( ginstate->oneCol )
115         {
116                 /*
117                  * Single column index doesn't store attribute numbers in tuples
118                  */
119                 res = index_getattr(tuple, FirstOffsetNumber, ginstate->origTupdesc,
120                                                         &isnull);
121         }
122         else
123         {
124                 /*
125                  * Since the datum type depends on which index column it's from,
126                  * we must be careful to use the right tuple descriptor here.
127                  */
128                 OffsetNumber colN = gintuple_get_attrnum(ginstate, tuple);
129
130                 res = index_getattr(tuple, OffsetNumberNext(FirstOffsetNumber),
131                                                         ginstate->tupdesc[colN - 1],
132                                                         &isnull);
133         }
134
135         Assert(!isnull);
136
137         return res;
138 }
139
140 /*
141  * Allocate a new page (either by recycling, or by extending the index file)
142  * The returned buffer is already pinned and exclusive-locked
143  * Caller is responsible for initializing the page by calling GinInitBuffer
144  */
145
146 Buffer
147 GinNewBuffer(Relation index)
148 {
149         Buffer          buffer;
150         bool            needLock;
151
152         /* First, try to get a page from FSM */
153         for (;;)
154         {
155                 BlockNumber blkno = GetFreeIndexPage(index);
156
157                 if (blkno == InvalidBlockNumber)
158                         break;
159
160                 buffer = ReadBuffer(index, blkno);
161
162                 /*
163                  * We have to guard against the possibility that someone else already
164                  * recycled this page; the buffer may be locked if so.
165                  */
166                 if (ConditionalLockBuffer(buffer))
167                 {
168                         Page            page = BufferGetPage(buffer);
169
170                         if (PageIsNew(page))
171                                 return buffer;  /* OK to use, if never initialized */
172
173                         if (GinPageIsDeleted(page))
174                                 return buffer;  /* OK to use */
175
176                         LockBuffer(buffer, GIN_UNLOCK);
177                 }
178
179                 /* Can't use it, so release buffer and try again */
180                 ReleaseBuffer(buffer);
181         }
182
183         /* Must extend the file */
184         needLock = !RELATION_IS_LOCAL(index);
185         if (needLock)
186                 LockRelationForExtension(index, ExclusiveLock);
187
188         buffer = ReadBuffer(index, P_NEW);
189         LockBuffer(buffer, GIN_EXCLUSIVE);
190
191         if (needLock)
192                 UnlockRelationForExtension(index, ExclusiveLock);
193
194         return buffer;
195 }
196
197 void
198 GinInitPage(Page page, uint32 f, Size pageSize)
199 {
200         GinPageOpaque opaque;
201
202         PageInit(page, pageSize, sizeof(GinPageOpaqueData));
203
204         opaque = GinPageGetOpaque(page);
205         memset(opaque, 0, sizeof(GinPageOpaqueData));
206         opaque->flags = f;
207         opaque->rightlink = InvalidBlockNumber;
208 }
209
210 void
211 GinInitBuffer(Buffer b, uint32 f)
212 {
213         GinInitPage(BufferGetPage(b), f, BufferGetPageSize(b));
214 }
215
216 void
217 GinInitMetabuffer(Buffer b)
218 {
219         GinMetaPageData *metadata;
220         Page                     page = BufferGetPage(b);
221
222         GinInitPage(page, GIN_META, BufferGetPageSize(b));
223
224         metadata = GinPageGetMeta(page);
225
226         metadata->head = metadata->tail = InvalidBlockNumber;
227         metadata->tailFreeSize = 0;
228         metadata->nPendingPages = 0;
229         metadata->nPendingHeapTuples = 0;
230 }
231
232 int
233 compareEntries(GinState *ginstate, OffsetNumber attnum, Datum a, Datum b)
234 {
235         return DatumGetInt32(
236                                                  FunctionCall2(
237                                                                            &ginstate->compareFn[attnum-1],
238                                                                            a, b
239                                                                            )
240                 );
241 }
242
243 int
244 compareAttEntries(GinState *ginstate, OffsetNumber attnum_a, Datum a,
245                                                                           OffsetNumber attnum_b, Datum b)
246 {
247         if ( attnum_a == attnum_b )
248                 return compareEntries( ginstate, attnum_a, a, b);
249
250         return ( attnum_a < attnum_b ) ? -1 : 1;
251 }
252
253 typedef struct
254 {
255         FmgrInfo   *cmpDatumFunc;
256         bool       *needUnique;
257 } cmpEntriesData;
258
259 static int
260 cmpEntries(const Datum *a, const Datum *b, cmpEntriesData *arg)
261 {
262         int                     res = DatumGetInt32(FunctionCall2(arg->cmpDatumFunc,
263                                                                                                   *a, *b));
264
265         if (res == 0)
266                 *(arg->needUnique) = TRUE;
267
268         return res;
269 }
270
271 Datum *
272 extractEntriesS(GinState *ginstate, OffsetNumber attnum, Datum value, int32 *nentries,
273                                 bool *needUnique)
274 {
275         Datum      *entries;
276
277         entries = (Datum *) DatumGetPointer(FunctionCall2(
278                                                                                                    &ginstate->extractValueFn[attnum-1],
279                                                                                                           value,
280                                                                                                         PointerGetDatum(nentries)
281                                                                                                           ));
282
283         if (entries == NULL)
284                 *nentries = 0;
285
286         *needUnique = FALSE;
287         if (*nentries > 1)
288         {
289                 cmpEntriesData arg;
290
291                 arg.cmpDatumFunc = &ginstate->compareFn[attnum-1];
292                 arg.needUnique = needUnique;
293                 qsort_arg(entries, *nentries, sizeof(Datum),
294                                   (qsort_arg_comparator) cmpEntries, (void *) &arg);
295         }
296
297         return entries;
298 }
299
300
301 Datum *
302 extractEntriesSU(GinState *ginstate, OffsetNumber attnum, Datum value, int32 *nentries)
303 {
304         bool            needUnique;
305         Datum      *entries = extractEntriesS(ginstate, attnum, value, nentries,
306                                                                                   &needUnique);
307
308         if (needUnique)
309         {
310                 Datum      *ptr,
311                                    *res;
312
313                 ptr = res = entries;
314
315                 while (ptr - entries < *nentries)
316                 {
317                         if (compareEntries(ginstate, attnum, *ptr, *res) != 0)
318                                 *(++res) = *ptr++;
319                         else
320                                 ptr++;
321                 }
322
323                 *nentries = res + 1 - entries;
324         }
325
326         return entries;
327 }
328
329 Datum
330 ginoptions(PG_FUNCTION_ARGS)
331 {
332         Datum           reloptions = PG_GETARG_DATUM(0);
333         bool            validate = PG_GETARG_BOOL(1);
334         relopt_value *options;
335         GinOptions *rdopts;
336         int                     numoptions;
337         static const relopt_parse_elt tab[] = {
338                 {"fastupdate", RELOPT_TYPE_BOOL, offsetof(GinOptions, useFastUpdate)}
339         };
340
341         options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIN,
342                                                           &numoptions);
343
344         /* if none set, we're done */
345         if (numoptions == 0)
346                 PG_RETURN_NULL();
347
348         rdopts = allocateReloptStruct(sizeof(GinOptions), options, numoptions);
349
350         fillRelOptions((void *) rdopts, sizeof(GinOptions), options, numoptions,
351                                         validate, tab, lengthof(tab));
352
353         pfree(options);
354
355         PG_RETURN_BYTEA_P(rdopts);
356 }