]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/ginutil.c
Reduce pinning and buffer content locking for btree scans.
[postgresql] / src / backend / access / gin / ginutil.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginutil.c
4  *        utilities routines for the postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/gin/ginutil.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/gin_private.h"
18 #include "access/reloptions.h"
19 #include "access/xloginsert.h"
20 #include "catalog/pg_collation.h"
21 #include "catalog/pg_type.h"
22 #include "miscadmin.h"
23 #include "storage/indexfsm.h"
24 #include "storage/lmgr.h"
25
26
27 /*
28  * initGinState: fill in an empty GinState struct to describe the index
29  *
30  * Note: assorted subsidiary data is allocated in the CurrentMemoryContext.
31  */
32 void
33 initGinState(GinState *state, Relation index)
34 {
35         TupleDesc       origTupdesc = RelationGetDescr(index);
36         int                     i;
37
38         MemSet(state, 0, sizeof(GinState));
39
40         state->index = index;
41         state->oneCol = (origTupdesc->natts == 1) ? true : false;
42         state->origTupdesc = origTupdesc;
43
44         for (i = 0; i < origTupdesc->natts; i++)
45         {
46                 if (state->oneCol)
47                         state->tupdesc[i] = state->origTupdesc;
48                 else
49                 {
50                         state->tupdesc[i] = CreateTemplateTupleDesc(2, false);
51
52                         TupleDescInitEntry(state->tupdesc[i], (AttrNumber) 1, NULL,
53                                                            INT2OID, -1, 0);
54                         TupleDescInitEntry(state->tupdesc[i], (AttrNumber) 2, NULL,
55                                                            origTupdesc->attrs[i]->atttypid,
56                                                            origTupdesc->attrs[i]->atttypmod,
57                                                            origTupdesc->attrs[i]->attndims);
58                         TupleDescInitEntryCollation(state->tupdesc[i], (AttrNumber) 2,
59                                                                                 origTupdesc->attrs[i]->attcollation);
60                 }
61
62                 fmgr_info_copy(&(state->compareFn[i]),
63                                            index_getprocinfo(index, i + 1, GIN_COMPARE_PROC),
64                                            CurrentMemoryContext);
65                 fmgr_info_copy(&(state->extractValueFn[i]),
66                                            index_getprocinfo(index, i + 1, GIN_EXTRACTVALUE_PROC),
67                                            CurrentMemoryContext);
68                 fmgr_info_copy(&(state->extractQueryFn[i]),
69                                            index_getprocinfo(index, i + 1, GIN_EXTRACTQUERY_PROC),
70                                            CurrentMemoryContext);
71
72                 /*
73                  * Check opclass capability to do tri-state or binary logic consistent
74                  * check.
75                  */
76                 if (index_getprocid(index, i + 1, GIN_TRICONSISTENT_PROC) != InvalidOid)
77                 {
78                         fmgr_info_copy(&(state->triConsistentFn[i]),
79                                          index_getprocinfo(index, i + 1, GIN_TRICONSISTENT_PROC),
80                                                    CurrentMemoryContext);
81                 }
82
83                 if (index_getprocid(index, i + 1, GIN_CONSISTENT_PROC) != InvalidOid)
84                 {
85                         fmgr_info_copy(&(state->consistentFn[i]),
86                                                 index_getprocinfo(index, i + 1, GIN_CONSISTENT_PROC),
87                                                    CurrentMemoryContext);
88                 }
89
90                 if (state->consistentFn[i].fn_oid == InvalidOid &&
91                         state->triConsistentFn[i].fn_oid == InvalidOid)
92                 {
93                         elog(ERROR, "missing GIN support function (%d or %d) for attribute %d of index \"%s\"",
94                                  GIN_CONSISTENT_PROC, GIN_TRICONSISTENT_PROC,
95                                  i + 1, RelationGetRelationName(index));
96                 }
97
98                 /*
99                  * Check opclass capability to do partial match.
100                  */
101                 if (index_getprocid(index, i + 1, GIN_COMPARE_PARTIAL_PROC) != InvalidOid)
102                 {
103                         fmgr_info_copy(&(state->comparePartialFn[i]),
104                                    index_getprocinfo(index, i + 1, GIN_COMPARE_PARTIAL_PROC),
105                                                    CurrentMemoryContext);
106                         state->canPartialMatch[i] = true;
107                 }
108                 else
109                 {
110                         state->canPartialMatch[i] = false;
111                 }
112
113                 /*
114                  * If the index column has a specified collation, we should honor that
115                  * while doing comparisons.  However, we may have a collatable storage
116                  * type for a noncollatable indexed data type (for instance, hstore
117                  * uses text index entries).  If there's no index collation then
118                  * specify default collation in case the support functions need
119                  * collation.  This is harmless if the support functions don't care
120                  * about collation, so we just do it unconditionally.  (We could
121                  * alternatively call get_typcollation, but that seems like expensive
122                  * overkill --- there aren't going to be any cases where a GIN storage
123                  * type has a nondefault collation.)
124                  */
125                 if (OidIsValid(index->rd_indcollation[i]))
126                         state->supportCollation[i] = index->rd_indcollation[i];
127                 else
128                         state->supportCollation[i] = DEFAULT_COLLATION_OID;
129         }
130 }
131
132 /*
133  * Extract attribute (column) number of stored entry from GIN tuple
134  */
135 OffsetNumber
136 gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
137 {
138         OffsetNumber colN;
139
140         if (ginstate->oneCol)
141         {
142                 /* column number is not stored explicitly */
143                 colN = FirstOffsetNumber;
144         }
145         else
146         {
147                 Datum           res;
148                 bool            isnull;
149
150                 /*
151                  * First attribute is always int16, so we can safely use any tuple
152                  * descriptor to obtain first attribute of tuple
153                  */
154                 res = index_getattr(tuple, FirstOffsetNumber, ginstate->tupdesc[0],
155                                                         &isnull);
156                 Assert(!isnull);
157
158                 colN = DatumGetUInt16(res);
159                 Assert(colN >= FirstOffsetNumber && colN <= ginstate->origTupdesc->natts);
160         }
161
162         return colN;
163 }
164
165 /*
166  * Extract stored datum (and possible null category) from GIN tuple
167  */
168 Datum
169 gintuple_get_key(GinState *ginstate, IndexTuple tuple,
170                                  GinNullCategory *category)
171 {
172         Datum           res;
173         bool            isnull;
174
175         if (ginstate->oneCol)
176         {
177                 /*
178                  * Single column index doesn't store attribute numbers in tuples
179                  */
180                 res = index_getattr(tuple, FirstOffsetNumber, ginstate->origTupdesc,
181                                                         &isnull);
182         }
183         else
184         {
185                 /*
186                  * Since the datum type depends on which index column it's from, we
187                  * must be careful to use the right tuple descriptor here.
188                  */
189                 OffsetNumber colN = gintuple_get_attrnum(ginstate, tuple);
190
191                 res = index_getattr(tuple, OffsetNumberNext(FirstOffsetNumber),
192                                                         ginstate->tupdesc[colN - 1],
193                                                         &isnull);
194         }
195
196         if (isnull)
197                 *category = GinGetNullCategory(tuple, ginstate);
198         else
199                 *category = GIN_CAT_NORM_KEY;
200
201         return res;
202 }
203
204 /*
205  * Allocate a new page (either by recycling, or by extending the index file)
206  * The returned buffer is already pinned and exclusive-locked
207  * Caller is responsible for initializing the page by calling GinInitBuffer
208  */
209 Buffer
210 GinNewBuffer(Relation index)
211 {
212         Buffer          buffer;
213         bool            needLock;
214
215         /* First, try to get a page from FSM */
216         for (;;)
217         {
218                 BlockNumber blkno = GetFreeIndexPage(index);
219
220                 if (blkno == InvalidBlockNumber)
221                         break;
222
223                 buffer = ReadBuffer(index, blkno);
224
225                 /*
226                  * We have to guard against the possibility that someone else already
227                  * recycled this page; the buffer may be locked if so.
228                  */
229                 if (ConditionalLockBuffer(buffer))
230                 {
231                         Page            page = BufferGetPage(buffer);
232
233                         if (PageIsNew(page))
234                                 return buffer;  /* OK to use, if never initialized */
235
236                         if (GinPageIsDeleted(page))
237                                 return buffer;  /* OK to use */
238
239                         LockBuffer(buffer, GIN_UNLOCK);
240                 }
241
242                 /* Can't use it, so release buffer and try again */
243                 ReleaseBuffer(buffer);
244         }
245
246         /* Must extend the file */
247         needLock = !RELATION_IS_LOCAL(index);
248         if (needLock)
249                 LockRelationForExtension(index, ExclusiveLock);
250
251         buffer = ReadBuffer(index, P_NEW);
252         LockBuffer(buffer, GIN_EXCLUSIVE);
253
254         if (needLock)
255                 UnlockRelationForExtension(index, ExclusiveLock);
256
257         return buffer;
258 }
259
260 void
261 GinInitPage(Page page, uint32 f, Size pageSize)
262 {
263         GinPageOpaque opaque;
264
265         PageInit(page, pageSize, sizeof(GinPageOpaqueData));
266
267         opaque = GinPageGetOpaque(page);
268         memset(opaque, 0, sizeof(GinPageOpaqueData));
269         opaque->flags = f;
270         opaque->rightlink = InvalidBlockNumber;
271 }
272
273 void
274 GinInitBuffer(Buffer b, uint32 f)
275 {
276         GinInitPage(BufferGetPage(b), f, BufferGetPageSize(b));
277 }
278
279 void
280 GinInitMetabuffer(Buffer b)
281 {
282         GinMetaPageData *metadata;
283         Page            page = BufferGetPage(b);
284
285         GinInitPage(page, GIN_META, BufferGetPageSize(b));
286
287         metadata = GinPageGetMeta(page);
288
289         metadata->head = metadata->tail = InvalidBlockNumber;
290         metadata->tailFreeSize = 0;
291         metadata->nPendingPages = 0;
292         metadata->nPendingHeapTuples = 0;
293         metadata->nTotalPages = 0;
294         metadata->nEntryPages = 0;
295         metadata->nDataPages = 0;
296         metadata->nEntries = 0;
297         metadata->ginVersion = GIN_CURRENT_VERSION;
298 }
299
300 /*
301  * Compare two keys of the same index column
302  */
303 int
304 ginCompareEntries(GinState *ginstate, OffsetNumber attnum,
305                                   Datum a, GinNullCategory categorya,
306                                   Datum b, GinNullCategory categoryb)
307 {
308         /* if not of same null category, sort by that first */
309         if (categorya != categoryb)
310                 return (categorya < categoryb) ? -1 : 1;
311
312         /* all null items in same category are equal */
313         if (categorya != GIN_CAT_NORM_KEY)
314                 return 0;
315
316         /* both not null, so safe to call the compareFn */
317         return DatumGetInt32(FunctionCall2Coll(&ginstate->compareFn[attnum - 1],
318                                                                           ginstate->supportCollation[attnum - 1],
319                                                                                    a, b));
320 }
321
322 /*
323  * Compare two keys of possibly different index columns
324  */
325 int
326 ginCompareAttEntries(GinState *ginstate,
327                                          OffsetNumber attnuma, Datum a, GinNullCategory categorya,
328                                          OffsetNumber attnumb, Datum b, GinNullCategory categoryb)
329 {
330         /* attribute number is the first sort key */
331         if (attnuma != attnumb)
332                 return (attnuma < attnumb) ? -1 : 1;
333
334         return ginCompareEntries(ginstate, attnuma, a, categorya, b, categoryb);
335 }
336
337
338 /*
339  * Support for sorting key datums in ginExtractEntries
340  *
341  * Note: we only have to worry about null and not-null keys here;
342  * ginExtractEntries never generates more than one placeholder null,
343  * so it doesn't have to sort those.
344  */
345 typedef struct
346 {
347         Datum           datum;
348         bool            isnull;
349 } keyEntryData;
350
351 typedef struct
352 {
353         FmgrInfo   *cmpDatumFunc;
354         Oid                     collation;
355         bool            haveDups;
356 } cmpEntriesArg;
357
358 static int
359 cmpEntries(const void *a, const void *b, void *arg)
360 {
361         const keyEntryData *aa = (const keyEntryData *) a;
362         const keyEntryData *bb = (const keyEntryData *) b;
363         cmpEntriesArg *data = (cmpEntriesArg *) arg;
364         int                     res;
365
366         if (aa->isnull)
367         {
368                 if (bb->isnull)
369                         res = 0;                        /* NULL "=" NULL */
370                 else
371                         res = 1;                        /* NULL ">" not-NULL */
372         }
373         else if (bb->isnull)
374                 res = -1;                               /* not-NULL "<" NULL */
375         else
376                 res = DatumGetInt32(FunctionCall2Coll(data->cmpDatumFunc,
377                                                                                           data->collation,
378                                                                                           aa->datum, bb->datum));
379
380         /*
381          * Detect if we have any duplicates.  If there are equal keys, qsort must
382          * compare them at some point, else it wouldn't know whether one should go
383          * before or after the other.
384          */
385         if (res == 0)
386                 data->haveDups = true;
387
388         return res;
389 }
390
391
392 /*
393  * Extract the index key values from an indexable item
394  *
395  * The resulting key values are sorted, and any duplicates are removed.
396  * This avoids generating redundant index entries.
397  */
398 Datum *
399 ginExtractEntries(GinState *ginstate, OffsetNumber attnum,
400                                   Datum value, bool isNull,
401                                   int32 *nentries, GinNullCategory **categories)
402 {
403         Datum      *entries;
404         bool       *nullFlags;
405         int32           i;
406
407         /*
408          * We don't call the extractValueFn on a null item.  Instead generate a
409          * placeholder.
410          */
411         if (isNull)
412         {
413                 *nentries = 1;
414                 entries = (Datum *) palloc(sizeof(Datum));
415                 entries[0] = (Datum) 0;
416                 *categories = (GinNullCategory *) palloc(sizeof(GinNullCategory));
417                 (*categories)[0] = GIN_CAT_NULL_ITEM;
418                 return entries;
419         }
420
421         /* OK, call the opclass's extractValueFn */
422         nullFlags = NULL;                       /* in case extractValue doesn't set it */
423         entries = (Datum *)
424                 DatumGetPointer(FunctionCall3Coll(&ginstate->extractValueFn[attnum - 1],
425                                                                           ginstate->supportCollation[attnum - 1],
426                                                                                   value,
427                                                                                   PointerGetDatum(nentries),
428                                                                                   PointerGetDatum(&nullFlags)));
429
430         /*
431          * Generate a placeholder if the item contained no keys.
432          */
433         if (entries == NULL || *nentries <= 0)
434         {
435                 *nentries = 1;
436                 entries = (Datum *) palloc(sizeof(Datum));
437                 entries[0] = (Datum) 0;
438                 *categories = (GinNullCategory *) palloc(sizeof(GinNullCategory));
439                 (*categories)[0] = GIN_CAT_EMPTY_ITEM;
440                 return entries;
441         }
442
443         /*
444          * If the extractValueFn didn't create a nullFlags array, create one,
445          * assuming that everything's non-null.  Otherwise, run through the array
446          * and make sure each value is exactly 0 or 1; this ensures binary
447          * compatibility with the GinNullCategory representation.
448          */
449         if (nullFlags == NULL)
450                 nullFlags = (bool *) palloc0(*nentries * sizeof(bool));
451         else
452         {
453                 for (i = 0; i < *nentries; i++)
454                         nullFlags[i] = (nullFlags[i] ? true : false);
455         }
456         /* now we can use the nullFlags as category codes */
457         *categories = (GinNullCategory *) nullFlags;
458
459         /*
460          * If there's more than one key, sort and unique-ify.
461          *
462          * XXX Using qsort here is notationally painful, and the overhead is
463          * pretty bad too.  For small numbers of keys it'd likely be better to use
464          * a simple insertion sort.
465          */
466         if (*nentries > 1)
467         {
468                 keyEntryData *keydata;
469                 cmpEntriesArg arg;
470
471                 keydata = (keyEntryData *) palloc(*nentries * sizeof(keyEntryData));
472                 for (i = 0; i < *nentries; i++)
473                 {
474                         keydata[i].datum = entries[i];
475                         keydata[i].isnull = nullFlags[i];
476                 }
477
478                 arg.cmpDatumFunc = &ginstate->compareFn[attnum - 1];
479                 arg.collation = ginstate->supportCollation[attnum - 1];
480                 arg.haveDups = false;
481                 qsort_arg(keydata, *nentries, sizeof(keyEntryData),
482                                   cmpEntries, (void *) &arg);
483
484                 if (arg.haveDups)
485                 {
486                         /* there are duplicates, must get rid of 'em */
487                         int32           j;
488
489                         entries[0] = keydata[0].datum;
490                         nullFlags[0] = keydata[0].isnull;
491                         j = 1;
492                         for (i = 1; i < *nentries; i++)
493                         {
494                                 if (cmpEntries(&keydata[i - 1], &keydata[i], &arg) != 0)
495                                 {
496                                         entries[j] = keydata[i].datum;
497                                         nullFlags[j] = keydata[i].isnull;
498                                         j++;
499                                 }
500                         }
501                         *nentries = j;
502                 }
503                 else
504                 {
505                         /* easy, no duplicates */
506                         for (i = 0; i < *nentries; i++)
507                         {
508                                 entries[i] = keydata[i].datum;
509                                 nullFlags[i] = keydata[i].isnull;
510                         }
511                 }
512
513                 pfree(keydata);
514         }
515
516         return entries;
517 }
518
519 Datum
520 ginoptions(PG_FUNCTION_ARGS)
521 {
522         Datum           reloptions = PG_GETARG_DATUM(0);
523         bool            validate = PG_GETARG_BOOL(1);
524         relopt_value *options;
525         GinOptions *rdopts;
526         int                     numoptions;
527         static const relopt_parse_elt tab[] = {
528                 {"fastupdate", RELOPT_TYPE_BOOL, offsetof(GinOptions, useFastUpdate)},
529                 {"gin_pending_list_limit", RELOPT_TYPE_INT, offsetof(GinOptions,
530                                                                                                                                 pendingListCleanupSize)}
531         };
532
533         options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIN,
534                                                           &numoptions);
535
536         /* if none set, we're done */
537         if (numoptions == 0)
538                 PG_RETURN_NULL();
539
540         rdopts = allocateReloptStruct(sizeof(GinOptions), options, numoptions);
541
542         fillRelOptions((void *) rdopts, sizeof(GinOptions), options, numoptions,
543                                    validate, tab, lengthof(tab));
544
545         pfree(options);
546
547         PG_RETURN_BYTEA_P(rdopts);
548 }
549
550 /*
551  * Fetch index's statistical data into *stats
552  *
553  * Note: in the result, nPendingPages can be trusted to be up-to-date,
554  * as can ginVersion; but the other fields are as of the last VACUUM.
555  */
556 void
557 ginGetStats(Relation index, GinStatsData *stats)
558 {
559         Buffer          metabuffer;
560         Page            metapage;
561         GinMetaPageData *metadata;
562
563         metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
564         LockBuffer(metabuffer, GIN_SHARE);
565         metapage = BufferGetPage(metabuffer);
566         metadata = GinPageGetMeta(metapage);
567
568         stats->nPendingPages = metadata->nPendingPages;
569         stats->nTotalPages = metadata->nTotalPages;
570         stats->nEntryPages = metadata->nEntryPages;
571         stats->nDataPages = metadata->nDataPages;
572         stats->nEntries = metadata->nEntries;
573         stats->ginVersion = metadata->ginVersion;
574
575         UnlockReleaseBuffer(metabuffer);
576 }
577
578 /*
579  * Write the given statistics to the index's metapage
580  *
581  * Note: nPendingPages and ginVersion are *not* copied over
582  */
583 void
584 ginUpdateStats(Relation index, const GinStatsData *stats)
585 {
586         Buffer          metabuffer;
587         Page            metapage;
588         GinMetaPageData *metadata;
589
590         metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
591         LockBuffer(metabuffer, GIN_EXCLUSIVE);
592         metapage = BufferGetPage(metabuffer);
593         metadata = GinPageGetMeta(metapage);
594
595         START_CRIT_SECTION();
596
597         metadata->nTotalPages = stats->nTotalPages;
598         metadata->nEntryPages = stats->nEntryPages;
599         metadata->nDataPages = stats->nDataPages;
600         metadata->nEntries = stats->nEntries;
601
602         MarkBufferDirty(metabuffer);
603
604         if (RelationNeedsWAL(index))
605         {
606                 XLogRecPtr      recptr;
607                 ginxlogUpdateMeta data;
608
609                 data.node = index->rd_node;
610                 data.ntuples = 0;
611                 data.newRightlink = data.prevTail = InvalidBlockNumber;
612                 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
613
614                 XLogBeginInsert();
615                 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
616                 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
617
618                 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
619                 PageSetLSN(metapage, recptr);
620         }
621
622         UnlockReleaseBuffer(metabuffer);
623
624         END_CRIT_SECTION();
625 }