]> granicus.if.org Git - postgresql/blob - src/backend/access/brin/brin.c
Support parallel btree index builds.
[postgresql] / src / backend / access / brin / brin.c
1 /*
2  * brin.c
3  *              Implementation of BRIN indexes for Postgres
4  *
5  * See src/backend/access/brin/README for details.
6  *
7  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        src/backend/access/brin/brin.c
12  *
13  * TODO
14  *              * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15  */
16 #include "postgres.h"
17
18 #include "access/brin.h"
19 #include "access/brin_page.h"
20 #include "access/brin_pageops.h"
21 #include "access/brin_xlog.h"
22 #include "access/reloptions.h"
23 #include "access/relscan.h"
24 #include "access/xloginsert.h"
25 #include "catalog/index.h"
26 #include "catalog/pg_am.h"
27 #include "miscadmin.h"
28 #include "pgstat.h"
29 #include "postmaster/autovacuum.h"
30 #include "storage/bufmgr.h"
31 #include "storage/freespace.h"
32 #include "utils/builtins.h"
33 #include "utils/index_selfuncs.h"
34 #include "utils/memutils.h"
35 #include "utils/rel.h"
36
37
38 /*
39  * We use a BrinBuildState during initial construction of a BRIN index.
40  * The running state is kept in a BrinMemTuple.
41  */
42 typedef struct BrinBuildState
43 {
44         Relation        bs_irel;
45         int                     bs_numtuples;
46         Buffer          bs_currentInsertBuf;
47         BlockNumber bs_pagesPerRange;
48         BlockNumber bs_currRangeStart;
49         BrinRevmap *bs_rmAccess;
50         BrinDesc   *bs_bdesc;
51         BrinMemTuple *bs_dtuple;
52 } BrinBuildState;
53
54 /*
55  * Struct used as "opaque" during index scans
56  */
57 typedef struct BrinOpaque
58 {
59         BlockNumber bo_pagesPerRange;
60         BrinRevmap *bo_rmAccess;
61         BrinDesc   *bo_bdesc;
62 } BrinOpaque;
63
64 #define BRIN_ALL_BLOCKRANGES    InvalidBlockNumber
65
66 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
67                                                    BrinRevmap *revmap, BlockNumber pagesPerRange);
68 static void terminate_brin_buildstate(BrinBuildState *state);
69 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
70                           bool include_partial, double *numSummarized, double *numExisting);
71 static void form_and_insert_tuple(BrinBuildState *state);
72 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
73                          BrinTuple *b);
74 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
75
76
77 /*
78  * BRIN handler function: return IndexAmRoutine with access method parameters
79  * and callbacks.
80  */
81 Datum
82 brinhandler(PG_FUNCTION_ARGS)
83 {
84         IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
85
86         amroutine->amstrategies = 0;
87         amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
88         amroutine->amcanorder = false;
89         amroutine->amcanorderbyop = false;
90         amroutine->amcanbackward = false;
91         amroutine->amcanunique = false;
92         amroutine->amcanmulticol = true;
93         amroutine->amoptionalkey = true;
94         amroutine->amsearcharray = false;
95         amroutine->amsearchnulls = true;
96         amroutine->amstorage = true;
97         amroutine->amclusterable = false;
98         amroutine->ampredlocks = false;
99         amroutine->amcanparallel = false;
100         amroutine->amkeytype = InvalidOid;
101
102         amroutine->ambuild = brinbuild;
103         amroutine->ambuildempty = brinbuildempty;
104         amroutine->aminsert = brininsert;
105         amroutine->ambulkdelete = brinbulkdelete;
106         amroutine->amvacuumcleanup = brinvacuumcleanup;
107         amroutine->amcanreturn = NULL;
108         amroutine->amcostestimate = brincostestimate;
109         amroutine->amoptions = brinoptions;
110         amroutine->amproperty = NULL;
111         amroutine->amvalidate = brinvalidate;
112         amroutine->ambeginscan = brinbeginscan;
113         amroutine->amrescan = brinrescan;
114         amroutine->amgettuple = NULL;
115         amroutine->amgetbitmap = bringetbitmap;
116         amroutine->amendscan = brinendscan;
117         amroutine->ammarkpos = NULL;
118         amroutine->amrestrpos = NULL;
119         amroutine->amestimateparallelscan = NULL;
120         amroutine->aminitparallelscan = NULL;
121         amroutine->amparallelrescan = NULL;
122
123         PG_RETURN_POINTER(amroutine);
124 }
125
126 /*
127  * A tuple in the heap is being inserted.  To keep a brin index up to date,
128  * we need to obtain the relevant index tuple and compare its stored values
129  * with those of the new tuple.  If the tuple values are not consistent with
130  * the summary tuple, we need to update the index tuple.
131  *
132  * If autosummarization is enabled, check if we need to summarize the previous
133  * page range.
134  *
135  * If the range is not currently summarized (i.e. the revmap returns NULL for
136  * it), there's nothing to do for this tuple.
137  */
138 bool
139 brininsert(Relation idxRel, Datum *values, bool *nulls,
140                    ItemPointer heaptid, Relation heapRel,
141                    IndexUniqueCheck checkUnique,
142                    IndexInfo *indexInfo)
143 {
144         BlockNumber pagesPerRange;
145         BlockNumber origHeapBlk;
146         BlockNumber heapBlk;
147         BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
148         BrinRevmap *revmap;
149         Buffer          buf = InvalidBuffer;
150         MemoryContext tupcxt = NULL;
151         MemoryContext oldcxt = CurrentMemoryContext;
152         bool            autosummarize = BrinGetAutoSummarize(idxRel);
153
154         revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
155
156         /*
157          * origHeapBlk is the block number where the insertion occurred.  heapBlk
158          * is the first block in the corresponding page range.
159          */
160         origHeapBlk = ItemPointerGetBlockNumber(heaptid);
161         heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
162
163         for (;;)
164         {
165                 bool            need_insert = false;
166                 OffsetNumber off;
167                 BrinTuple  *brtup;
168                 BrinMemTuple *dtup;
169                 int                     keyno;
170
171                 CHECK_FOR_INTERRUPTS();
172
173                 /*
174                  * If auto-summarization is enabled and we just inserted the first
175                  * tuple into the first block of a new non-first page range, request a
176                  * summarization run of the previous range.
177                  */
178                 if (autosummarize &&
179                         heapBlk > 0 &&
180                         heapBlk == origHeapBlk &&
181                         ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
182                 {
183                         BlockNumber lastPageRange = heapBlk - 1;
184                         BrinTuple  *lastPageTuple;
185
186                         lastPageTuple =
187                                 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
188                                                                                  NULL, BUFFER_LOCK_SHARE, NULL);
189                         if (!lastPageTuple)
190                                 AutoVacuumRequestWork(AVW_BRINSummarizeRange,
191                                                                           RelationGetRelid(idxRel),
192                                                                           lastPageRange);
193                         else
194                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
195                 }
196
197                 brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
198                                                                                  NULL, BUFFER_LOCK_SHARE, NULL);
199
200                 /* if range is unsummarized, there's nothing to do */
201                 if (!brtup)
202                         break;
203
204                 /* First time through in this statement? */
205                 if (bdesc == NULL)
206                 {
207                         MemoryContextSwitchTo(indexInfo->ii_Context);
208                         bdesc = brin_build_desc(idxRel);
209                         indexInfo->ii_AmCache = (void *) bdesc;
210                         MemoryContextSwitchTo(oldcxt);
211                 }
212                 /* First time through in this brininsert call? */
213                 if (tupcxt == NULL)
214                 {
215                         tupcxt = AllocSetContextCreate(CurrentMemoryContext,
216                                                                                    "brininsert cxt",
217                                                                                    ALLOCSET_DEFAULT_SIZES);
218                         MemoryContextSwitchTo(tupcxt);
219                 }
220
221                 dtup = brin_deform_tuple(bdesc, brtup, NULL);
222
223                 /*
224                  * Compare the key values of the new tuple to the stored index values;
225                  * our deformed tuple will get updated if the new tuple doesn't fit
226                  * the original range (note this means we can't break out of the loop
227                  * early). Make a note of whether this happens, so that we know to
228                  * insert the modified tuple later.
229                  */
230                 for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
231                 {
232                         Datum           result;
233                         BrinValues *bval;
234                         FmgrInfo   *addValue;
235
236                         bval = &dtup->bt_columns[keyno];
237                         addValue = index_getprocinfo(idxRel, keyno + 1,
238                                                                                  BRIN_PROCNUM_ADDVALUE);
239                         result = FunctionCall4Coll(addValue,
240                                                                            idxRel->rd_indcollation[keyno],
241                                                                            PointerGetDatum(bdesc),
242                                                                            PointerGetDatum(bval),
243                                                                            values[keyno],
244                                                                            nulls[keyno]);
245                         /* if that returned true, we need to insert the updated tuple */
246                         need_insert |= DatumGetBool(result);
247                 }
248
249                 if (!need_insert)
250                 {
251                         /*
252                          * The tuple is consistent with the new values, so there's nothing
253                          * to do.
254                          */
255                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
256                 }
257                 else
258                 {
259                         Page            page = BufferGetPage(buf);
260                         ItemId          lp = PageGetItemId(page, off);
261                         Size            origsz;
262                         BrinTuple  *origtup;
263                         Size            newsz;
264                         BrinTuple  *newtup;
265                         bool            samepage;
266
267                         /*
268                          * Make a copy of the old tuple, so that we can compare it after
269                          * re-acquiring the lock.
270                          */
271                         origsz = ItemIdGetLength(lp);
272                         origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
273
274                         /*
275                          * Before releasing the lock, check if we can attempt a same-page
276                          * update.  Another process could insert a tuple concurrently in
277                          * the same page though, so downstream we must be prepared to cope
278                          * if this turns out to not be possible after all.
279                          */
280                         newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
281                         samepage = brin_can_do_samepage_update(buf, origsz, newsz);
282                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
283
284                         /*
285                          * Try to update the tuple.  If this doesn't work for whatever
286                          * reason, we need to restart from the top; the revmap might be
287                          * pointing at a different tuple for this block now, so we need to
288                          * recompute to ensure both our new heap tuple and the other
289                          * inserter's are covered by the combined tuple.  It might be that
290                          * we don't need to update at all.
291                          */
292                         if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
293                                                            buf, off, origtup, origsz, newtup, newsz,
294                                                            samepage))
295                         {
296                                 /* no luck; start over */
297                                 MemoryContextResetAndDeleteChildren(tupcxt);
298                                 continue;
299                         }
300                 }
301
302                 /* success! */
303                 break;
304         }
305
306         brinRevmapTerminate(revmap);
307         if (BufferIsValid(buf))
308                 ReleaseBuffer(buf);
309         MemoryContextSwitchTo(oldcxt);
310         if (tupcxt != NULL)
311                 MemoryContextDelete(tupcxt);
312
313         return false;
314 }
315
316 /*
317  * Initialize state for a BRIN index scan.
318  *
319  * We read the metapage here to determine the pages-per-range number that this
320  * index was built with.  Note that since this cannot be changed while we're
321  * holding lock on index, it's not necessary to recompute it during brinrescan.
322  */
323 IndexScanDesc
324 brinbeginscan(Relation r, int nkeys, int norderbys)
325 {
326         IndexScanDesc scan;
327         BrinOpaque *opaque;
328
329         scan = RelationGetIndexScan(r, nkeys, norderbys);
330
331         opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
332         opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
333                                                                                            scan->xs_snapshot);
334         opaque->bo_bdesc = brin_build_desc(r);
335         scan->opaque = opaque;
336
337         return scan;
338 }
339
340 /*
341  * Execute the index scan.
342  *
343  * This works by reading index TIDs from the revmap, and obtaining the index
344  * tuples pointed to by them; the summary values in the index tuples are
345  * compared to the scan keys.  We return into the TID bitmap all the pages in
346  * ranges corresponding to index tuples that match the scan keys.
347  *
348  * If a TID from the revmap is read as InvalidTID, we know that range is
349  * unsummarized.  Pages in those ranges need to be returned regardless of scan
350  * keys.
351  */
352 int64
353 bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
354 {
355         Relation        idxRel = scan->indexRelation;
356         Buffer          buf = InvalidBuffer;
357         BrinDesc   *bdesc;
358         Oid                     heapOid;
359         Relation        heapRel;
360         BrinOpaque *opaque;
361         BlockNumber nblocks;
362         BlockNumber heapBlk;
363         int                     totalpages = 0;
364         FmgrInfo   *consistentFn;
365         MemoryContext oldcxt;
366         MemoryContext perRangeCxt;
367         BrinMemTuple *dtup;
368         BrinTuple  *btup = NULL;
369         Size            btupsz = 0;
370
371         opaque = (BrinOpaque *) scan->opaque;
372         bdesc = opaque->bo_bdesc;
373         pgstat_count_index_scan(idxRel);
374
375         /*
376          * We need to know the size of the table so that we know how long to
377          * iterate on the revmap.
378          */
379         heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
380         heapRel = heap_open(heapOid, AccessShareLock);
381         nblocks = RelationGetNumberOfBlocks(heapRel);
382         heap_close(heapRel, AccessShareLock);
383
384         /*
385          * Make room for the consistent support procedures of indexed columns.  We
386          * don't look them up here; we do that lazily the first time we see a scan
387          * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
388          */
389         consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
390
391         /* allocate an initial in-memory tuple, out of the per-range memcxt */
392         dtup = brin_new_memtuple(bdesc);
393
394         /*
395          * Setup and use a per-range memory context, which is reset every time we
396          * loop below.  This avoids having to free the tuples within the loop.
397          */
398         perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
399                                                                                 "bringetbitmap cxt",
400                                                                                 ALLOCSET_DEFAULT_SIZES);
401         oldcxt = MemoryContextSwitchTo(perRangeCxt);
402
403         /*
404          * Now scan the revmap.  We start by querying for heap page 0,
405          * incrementing by the number of pages per range; this gives us a full
406          * view of the table.
407          */
408         for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
409         {
410                 bool            addrange;
411                 bool            gottuple = false;
412                 BrinTuple  *tup;
413                 OffsetNumber off;
414                 Size            size;
415
416                 CHECK_FOR_INTERRUPTS();
417
418                 MemoryContextResetAndDeleteChildren(perRangeCxt);
419
420                 tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
421                                                                            &off, &size, BUFFER_LOCK_SHARE,
422                                                                            scan->xs_snapshot);
423                 if (tup)
424                 {
425                         gottuple = true;
426                         btup = brin_copy_tuple(tup, size, btup, &btupsz);
427                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
428                 }
429
430                 /*
431                  * For page ranges with no indexed tuple, we must return the whole
432                  * range; otherwise, compare it to the scan keys.
433                  */
434                 if (!gottuple)
435                 {
436                         addrange = true;
437                 }
438                 else
439                 {
440                         dtup = brin_deform_tuple(bdesc, btup, dtup);
441                         if (dtup->bt_placeholder)
442                         {
443                                 /*
444                                  * Placeholder tuples are always returned, regardless of the
445                                  * values stored in them.
446                                  */
447                                 addrange = true;
448                         }
449                         else
450                         {
451                                 int                     keyno;
452
453                                 /*
454                                  * Compare scan keys with summary values stored for the range.
455                                  * If scan keys are matched, the page range must be added to
456                                  * the bitmap.  We initially assume the range needs to be
457                                  * added; in particular this serves the case where there are
458                                  * no keys.
459                                  */
460                                 addrange = true;
461                                 for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
462                                 {
463                                         ScanKey         key = &scan->keyData[keyno];
464                                         AttrNumber      keyattno = key->sk_attno;
465                                         BrinValues *bval = &dtup->bt_columns[keyattno - 1];
466                                         Datum           add;
467
468                                         /*
469                                          * The collation of the scan key must match the collation
470                                          * used in the index column (but only if the search is not
471                                          * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
472                                          * this index ...
473                                          */
474                                         Assert((key->sk_flags & SK_ISNULL) ||
475                                                    (key->sk_collation ==
476                                                         TupleDescAttr(bdesc->bd_tupdesc,
477                                                                                   keyattno - 1)->attcollation));
478
479                                         /* First time this column? look up consistent function */
480                                         if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
481                                         {
482                                                 FmgrInfo   *tmp;
483
484                                                 tmp = index_getprocinfo(idxRel, keyattno,
485                                                                                                 BRIN_PROCNUM_CONSISTENT);
486                                                 fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
487                                                                            CurrentMemoryContext);
488                                         }
489
490                                         /*
491                                          * Check whether the scan key is consistent with the page
492                                          * range values; if so, have the pages in the range added
493                                          * to the output bitmap.
494                                          *
495                                          * When there are multiple scan keys, failure to meet the
496                                          * criteria for a single one of them is enough to discard
497                                          * the range as a whole, so break out of the loop as soon
498                                          * as a false return value is obtained.
499                                          */
500                                         add = FunctionCall3Coll(&consistentFn[keyattno - 1],
501                                                                                         key->sk_collation,
502                                                                                         PointerGetDatum(bdesc),
503                                                                                         PointerGetDatum(bval),
504                                                                                         PointerGetDatum(key));
505                                         addrange = DatumGetBool(add);
506                                         if (!addrange)
507                                                 break;
508                                 }
509                         }
510                 }
511
512                 /* add the pages in the range to the output bitmap, if needed */
513                 if (addrange)
514                 {
515                         BlockNumber pageno;
516
517                         for (pageno = heapBlk;
518                                  pageno <= heapBlk + opaque->bo_pagesPerRange - 1;
519                                  pageno++)
520                         {
521                                 MemoryContextSwitchTo(oldcxt);
522                                 tbm_add_page(tbm, pageno);
523                                 totalpages++;
524                                 MemoryContextSwitchTo(perRangeCxt);
525                         }
526                 }
527         }
528
529         MemoryContextSwitchTo(oldcxt);
530         MemoryContextDelete(perRangeCxt);
531
532         if (buf != InvalidBuffer)
533                 ReleaseBuffer(buf);
534
535         /*
536          * XXX We have an approximation of the number of *pages* that our scan
537          * returns, but we don't have a precise idea of the number of heap tuples
538          * involved.
539          */
540         return totalpages * 10;
541 }
542
543 /*
544  * Re-initialize state for a BRIN index scan
545  */
546 void
547 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
548                    ScanKey orderbys, int norderbys)
549 {
550         /*
551          * Other index AMs preprocess the scan keys at this point, or sometime
552          * early during the scan; this lets them optimize by removing redundant
553          * keys, or doing early returns when they are impossible to satisfy; see
554          * _bt_preprocess_keys for an example.  Something like that could be added
555          * here someday, too.
556          */
557
558         if (scankey && scan->numberOfKeys > 0)
559                 memmove(scan->keyData, scankey,
560                                 scan->numberOfKeys * sizeof(ScanKeyData));
561 }
562
563 /*
564  * Close down a BRIN index scan
565  */
566 void
567 brinendscan(IndexScanDesc scan)
568 {
569         BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
570
571         brinRevmapTerminate(opaque->bo_rmAccess);
572         brin_free_desc(opaque->bo_bdesc);
573         pfree(opaque);
574 }
575
576 /*
577  * Per-heap-tuple callback for IndexBuildHeapScan.
578  *
579  * Note we don't worry about the page range at the end of the table here; it is
580  * present in the build state struct after we're called the last time, but not
581  * inserted into the index.  Caller must ensure to do so, if appropriate.
582  */
583 static void
584 brinbuildCallback(Relation index,
585                                   HeapTuple htup,
586                                   Datum *values,
587                                   bool *isnull,
588                                   bool tupleIsAlive,
589                                   void *brstate)
590 {
591         BrinBuildState *state = (BrinBuildState *) brstate;
592         BlockNumber thisblock;
593         int                     i;
594
595         thisblock = ItemPointerGetBlockNumber(&htup->t_self);
596
597         /*
598          * If we're in a block that belongs to a future range, summarize what
599          * we've got and start afresh.  Note the scan might have skipped many
600          * pages, if they were devoid of live tuples; make sure to insert index
601          * tuples for those too.
602          */
603         while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
604         {
605
606                 BRIN_elog((DEBUG2,
607                                    "brinbuildCallback: completed a range: %u--%u",
608                                    state->bs_currRangeStart,
609                                    state->bs_currRangeStart + state->bs_pagesPerRange));
610
611                 /* create the index tuple and insert it */
612                 form_and_insert_tuple(state);
613
614                 /* set state to correspond to the next range */
615                 state->bs_currRangeStart += state->bs_pagesPerRange;
616
617                 /* re-initialize state for it */
618                 brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
619         }
620
621         /* Accumulate the current tuple into the running state */
622         for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
623         {
624                 FmgrInfo   *addValue;
625                 BrinValues *col;
626                 Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
627
628                 col = &state->bs_dtuple->bt_columns[i];
629                 addValue = index_getprocinfo(index, i + 1,
630                                                                          BRIN_PROCNUM_ADDVALUE);
631
632                 /*
633                  * Update dtuple state, if and as necessary.
634                  */
635                 FunctionCall4Coll(addValue,
636                                                   attr->attcollation,
637                                                   PointerGetDatum(state->bs_bdesc),
638                                                   PointerGetDatum(col),
639                                                   values[i], isnull[i]);
640         }
641 }
642
643 /*
644  * brinbuild() -- build a new BRIN index.
645  */
646 IndexBuildResult *
647 brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
648 {
649         IndexBuildResult *result;
650         double          reltuples;
651         double          idxtuples;
652         BrinRevmap *revmap;
653         BrinBuildState *state;
654         Buffer          meta;
655         BlockNumber pagesPerRange;
656
657         /*
658          * We expect to be called exactly once for any index relation.
659          */
660         if (RelationGetNumberOfBlocks(index) != 0)
661                 elog(ERROR, "index \"%s\" already contains data",
662                          RelationGetRelationName(index));
663
664         /*
665          * Critical section not required, because on error the creation of the
666          * whole relation will be rolled back.
667          */
668
669         meta = ReadBuffer(index, P_NEW);
670         Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
671         LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
672
673         brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
674                                            BRIN_CURRENT_VERSION);
675         MarkBufferDirty(meta);
676
677         if (RelationNeedsWAL(index))
678         {
679                 xl_brin_createidx xlrec;
680                 XLogRecPtr      recptr;
681                 Page            page;
682
683                 xlrec.version = BRIN_CURRENT_VERSION;
684                 xlrec.pagesPerRange = BrinGetPagesPerRange(index);
685
686                 XLogBeginInsert();
687                 XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
688                 XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
689
690                 recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
691
692                 page = BufferGetPage(meta);
693                 PageSetLSN(page, recptr);
694         }
695
696         UnlockReleaseBuffer(meta);
697
698         /*
699          * Initialize our state, including the deformed tuple state.
700          */
701         revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
702         state = initialize_brin_buildstate(index, revmap, pagesPerRange);
703
704         /*
705          * Now scan the relation.  No syncscan allowed here because we want the
706          * heap blocks in physical order.
707          */
708         reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
709                                                                    brinbuildCallback, (void *) state, NULL);
710
711         /* process the final batch */
712         form_and_insert_tuple(state);
713
714         /* release resources */
715         idxtuples = state->bs_numtuples;
716         brinRevmapTerminate(state->bs_rmAccess);
717         terminate_brin_buildstate(state);
718
719         /*
720          * Return statistics
721          */
722         result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
723
724         result->heap_tuples = reltuples;
725         result->index_tuples = idxtuples;
726
727         return result;
728 }
729
730 void
731 brinbuildempty(Relation index)
732 {
733         Buffer          metabuf;
734
735         /* An empty BRIN index has a metapage only. */
736         metabuf =
737                 ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
738         LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
739
740         /* Initialize and xlog metabuffer. */
741         START_CRIT_SECTION();
742         brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
743                                            BRIN_CURRENT_VERSION);
744         MarkBufferDirty(metabuf);
745         log_newpage_buffer(metabuf, true);
746         END_CRIT_SECTION();
747
748         UnlockReleaseBuffer(metabuf);
749 }
750
751 /*
752  * brinbulkdelete
753  *              Since there are no per-heap-tuple index tuples in BRIN indexes,
754  *              there's not a lot we can do here.
755  *
756  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
757  * tuple is deleted), meaning the need to re-run summarization on the affected
758  * range.  Would need to add an extra flag in brintuples for that.
759  */
760 IndexBulkDeleteResult *
761 brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
762                            IndexBulkDeleteCallback callback, void *callback_state)
763 {
764         /* allocate stats if first time through, else re-use existing struct */
765         if (stats == NULL)
766                 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
767
768         return stats;
769 }
770
771 /*
772  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
773  * ranges that are currently unsummarized.
774  */
775 IndexBulkDeleteResult *
776 brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
777 {
778         Relation        heapRel;
779
780         /* No-op in ANALYZE ONLY mode */
781         if (info->analyze_only)
782                 return stats;
783
784         if (!stats)
785                 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
786         stats->num_pages = RelationGetNumberOfBlocks(info->index);
787         /* rest of stats is initialized by zeroing */
788
789         heapRel = heap_open(IndexGetRelation(RelationGetRelid(info->index), false),
790                                                 AccessShareLock);
791
792         brin_vacuum_scan(info->index, info->strategy);
793
794         brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
795                                   &stats->num_index_tuples, &stats->num_index_tuples);
796
797         heap_close(heapRel, AccessShareLock);
798
799         return stats;
800 }
801
802 /*
803  * reloptions processor for BRIN indexes
804  */
805 bytea *
806 brinoptions(Datum reloptions, bool validate)
807 {
808         relopt_value *options;
809         BrinOptions *rdopts;
810         int                     numoptions;
811         static const relopt_parse_elt tab[] = {
812                 {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
813                 {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
814         };
815
816         options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
817                                                           &numoptions);
818
819         /* if none set, we're done */
820         if (numoptions == 0)
821                 return NULL;
822
823         rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
824
825         fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
826                                    validate, tab, lengthof(tab));
827
828         pfree(options);
829
830         return (bytea *) rdopts;
831 }
832
833 /*
834  * SQL-callable function to scan through an index and summarize all ranges
835  * that are not currently summarized.
836  */
837 Datum
838 brin_summarize_new_values(PG_FUNCTION_ARGS)
839 {
840         Datum           relation = PG_GETARG_DATUM(0);
841
842         return DirectFunctionCall2(brin_summarize_range,
843                                                            relation,
844                                                            Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
845 }
846
847 /*
848  * SQL-callable function to summarize the indicated page range, if not already
849  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
850  * unsummarized ranges are summarized.
851  */
852 Datum
853 brin_summarize_range(PG_FUNCTION_ARGS)
854 {
855         Oid                     indexoid = PG_GETARG_OID(0);
856         int64           heapBlk64 = PG_GETARG_INT64(1);
857         BlockNumber heapBlk;
858         Oid                     heapoid;
859         Relation        indexRel;
860         Relation        heapRel;
861         double          numSummarized = 0;
862
863         if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
864         {
865                 char       *blk = psprintf(INT64_FORMAT, heapBlk64);
866
867                 ereport(ERROR,
868                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
869                                  errmsg("block number out of range: %s", blk)));
870         }
871         heapBlk = (BlockNumber) heapBlk64;
872
873         /*
874          * We must lock table before index to avoid deadlocks.  However, if the
875          * passed indexoid isn't an index then IndexGetRelation() will fail.
876          * Rather than emitting a not-very-helpful error message, postpone
877          * complaining, expecting that the is-it-an-index test below will fail.
878          */
879         heapoid = IndexGetRelation(indexoid, true);
880         if (OidIsValid(heapoid))
881                 heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
882         else
883                 heapRel = NULL;
884
885         indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
886
887         /* Must be a BRIN index */
888         if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
889                 indexRel->rd_rel->relam != BRIN_AM_OID)
890                 ereport(ERROR,
891                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
892                                  errmsg("\"%s\" is not a BRIN index",
893                                                 RelationGetRelationName(indexRel))));
894
895         /* User must own the index (comparable to privileges needed for VACUUM) */
896         if (!pg_class_ownercheck(indexoid, GetUserId()))
897                 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
898                                            RelationGetRelationName(indexRel));
899
900         /*
901          * Since we did the IndexGetRelation call above without any lock, it's
902          * barely possible that a race against an index drop/recreation could have
903          * netted us the wrong table.  Recheck.
904          */
905         if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
906                 ereport(ERROR,
907                                 (errcode(ERRCODE_UNDEFINED_TABLE),
908                                  errmsg("could not open parent table of index %s",
909                                                 RelationGetRelationName(indexRel))));
910
911         /* OK, do it */
912         brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
913
914         relation_close(indexRel, ShareUpdateExclusiveLock);
915         relation_close(heapRel, ShareUpdateExclusiveLock);
916
917         PG_RETURN_INT32((int32) numSummarized);
918 }
919
920 /*
921  * SQL-callable interface to mark a range as no longer summarized
922  */
923 Datum
924 brin_desummarize_range(PG_FUNCTION_ARGS)
925 {
926         Oid                     indexoid = PG_GETARG_OID(0);
927         int64           heapBlk64 = PG_GETARG_INT64(1);
928         BlockNumber heapBlk;
929         Oid                     heapoid;
930         Relation        heapRel;
931         Relation        indexRel;
932         bool            done;
933
934         if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
935         {
936                 char       *blk = psprintf(INT64_FORMAT, heapBlk64);
937
938                 ereport(ERROR,
939                                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
940                                  errmsg("block number out of range: %s", blk)));
941         }
942         heapBlk = (BlockNumber) heapBlk64;
943
944         /*
945          * We must lock table before index to avoid deadlocks.  However, if the
946          * passed indexoid isn't an index then IndexGetRelation() will fail.
947          * Rather than emitting a not-very-helpful error message, postpone
948          * complaining, expecting that the is-it-an-index test below will fail.
949          */
950         heapoid = IndexGetRelation(indexoid, true);
951         if (OidIsValid(heapoid))
952                 heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
953         else
954                 heapRel = NULL;
955
956         indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
957
958         /* Must be a BRIN index */
959         if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
960                 indexRel->rd_rel->relam != BRIN_AM_OID)
961                 ereport(ERROR,
962                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
963                                  errmsg("\"%s\" is not a BRIN index",
964                                                 RelationGetRelationName(indexRel))));
965
966         /* User must own the index (comparable to privileges needed for VACUUM) */
967         if (!pg_class_ownercheck(indexoid, GetUserId()))
968                 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
969                                            RelationGetRelationName(indexRel));
970
971         /*
972          * Since we did the IndexGetRelation call above without any lock, it's
973          * barely possible that a race against an index drop/recreation could have
974          * netted us the wrong table.  Recheck.
975          */
976         if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
977                 ereport(ERROR,
978                                 (errcode(ERRCODE_UNDEFINED_TABLE),
979                                  errmsg("could not open parent table of index %s",
980                                                 RelationGetRelationName(indexRel))));
981
982         /* the revmap does the hard work */
983         do
984         {
985                 done = brinRevmapDesummarizeRange(indexRel, heapBlk);
986         }
987         while (!done);
988
989         relation_close(indexRel, ShareUpdateExclusiveLock);
990         relation_close(heapRel, ShareUpdateExclusiveLock);
991
992         PG_RETURN_VOID();
993 }
994
995 /*
996  * Build a BrinDesc used to create or scan a BRIN index
997  */
998 BrinDesc *
999 brin_build_desc(Relation rel)
1000 {
1001         BrinOpcInfo **opcinfo;
1002         BrinDesc   *bdesc;
1003         TupleDesc       tupdesc;
1004         int                     totalstored = 0;
1005         int                     keyno;
1006         long            totalsize;
1007         MemoryContext cxt;
1008         MemoryContext oldcxt;
1009
1010         cxt = AllocSetContextCreate(CurrentMemoryContext,
1011                                                                 "brin desc cxt",
1012                                                                 ALLOCSET_SMALL_SIZES);
1013         oldcxt = MemoryContextSwitchTo(cxt);
1014         tupdesc = RelationGetDescr(rel);
1015
1016         /*
1017          * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
1018          * the number of columns stored, since the number is opclass-defined.
1019          */
1020         opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1021         for (keyno = 0; keyno < tupdesc->natts; keyno++)
1022         {
1023                 FmgrInfo   *opcInfoFn;
1024                 Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1025
1026                 opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1027
1028                 opcinfo[keyno] = (BrinOpcInfo *)
1029                         DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1030                 totalstored += opcinfo[keyno]->oi_nstored;
1031         }
1032
1033         /* Allocate our result struct and fill it in */
1034         totalsize = offsetof(BrinDesc, bd_info) +
1035                 sizeof(BrinOpcInfo *) * tupdesc->natts;
1036
1037         bdesc = palloc(totalsize);
1038         bdesc->bd_context = cxt;
1039         bdesc->bd_index = rel;
1040         bdesc->bd_tupdesc = tupdesc;
1041         bdesc->bd_disktdesc = NULL; /* generated lazily */
1042         bdesc->bd_totalstored = totalstored;
1043
1044         for (keyno = 0; keyno < tupdesc->natts; keyno++)
1045                 bdesc->bd_info[keyno] = opcinfo[keyno];
1046         pfree(opcinfo);
1047
1048         MemoryContextSwitchTo(oldcxt);
1049
1050         return bdesc;
1051 }
1052
1053 void
1054 brin_free_desc(BrinDesc *bdesc)
1055 {
1056         /* make sure the tupdesc is still valid */
1057         Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1058         /* no need for retail pfree */
1059         MemoryContextDelete(bdesc->bd_context);
1060 }
1061
1062 /*
1063  * Fetch index's statistical data into *stats
1064  */
1065 void
1066 brinGetStats(Relation index, BrinStatsData *stats)
1067 {
1068         Buffer          metabuffer;
1069         Page            metapage;
1070         BrinMetaPageData *metadata;
1071
1072         metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1073         LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1074         metapage = BufferGetPage(metabuffer);
1075         metadata = (BrinMetaPageData *) PageGetContents(metapage);
1076
1077         stats->pagesPerRange = metadata->pagesPerRange;
1078         stats->revmapNumPages = metadata->lastRevmapPage - 1;
1079
1080         UnlockReleaseBuffer(metabuffer);
1081 }
1082
1083 /*
1084  * Initialize a BrinBuildState appropriate to create tuples on the given index.
1085  */
1086 static BrinBuildState *
1087 initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1088                                                    BlockNumber pagesPerRange)
1089 {
1090         BrinBuildState *state;
1091
1092         state = palloc(sizeof(BrinBuildState));
1093
1094         state->bs_irel = idxRel;
1095         state->bs_numtuples = 0;
1096         state->bs_currentInsertBuf = InvalidBuffer;
1097         state->bs_pagesPerRange = pagesPerRange;
1098         state->bs_currRangeStart = 0;
1099         state->bs_rmAccess = revmap;
1100         state->bs_bdesc = brin_build_desc(idxRel);
1101         state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1102
1103         brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1104
1105         return state;
1106 }
1107
1108 /*
1109  * Release resources associated with a BrinBuildState.
1110  */
1111 static void
1112 terminate_brin_buildstate(BrinBuildState *state)
1113 {
1114         /* release the last index buffer used */
1115         if (!BufferIsInvalid(state->bs_currentInsertBuf))
1116         {
1117                 Page            page;
1118
1119                 page = BufferGetPage(state->bs_currentInsertBuf);
1120                 RecordPageWithFreeSpace(state->bs_irel,
1121                                                                 BufferGetBlockNumber(state->bs_currentInsertBuf),
1122                                                                 PageGetFreeSpace(page));
1123                 ReleaseBuffer(state->bs_currentInsertBuf);
1124         }
1125
1126         brin_free_desc(state->bs_bdesc);
1127         pfree(state->bs_dtuple);
1128         pfree(state);
1129 }
1130
1131 /*
1132  * On the given BRIN index, summarize the heap page range that corresponds
1133  * to the heap block number given.
1134  *
1135  * This routine can run in parallel with insertions into the heap.  To avoid
1136  * missing those values from the summary tuple, we first insert a placeholder
1137  * index tuple into the index, then execute the heap scan; transactions
1138  * concurrent with the scan update the placeholder tuple.  After the scan, we
1139  * union the placeholder tuple with the one computed by this routine.  The
1140  * update of the index value happens in a loop, so that if somebody updates
1141  * the placeholder tuple after we read it, we detect the case and try again.
1142  * This ensures that the concurrently inserted tuples are not lost.
1143  *
1144  * A further corner case is this routine being asked to summarize the partial
1145  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
1146  * table size; if we notice that the requested range lies beyond that size,
1147  * we re-compute the table size after inserting the placeholder tuple, to
1148  * avoid missing pages that were appended recently.
1149  */
1150 static void
1151 summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1152                                 BlockNumber heapBlk, BlockNumber heapNumBlks)
1153 {
1154         Buffer          phbuf;
1155         BrinTuple  *phtup;
1156         Size            phsz;
1157         OffsetNumber offset;
1158         BlockNumber scanNumBlks;
1159
1160         /*
1161          * Insert the placeholder tuple
1162          */
1163         phbuf = InvalidBuffer;
1164         phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1165         offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1166                                                    state->bs_rmAccess, &phbuf,
1167                                                    heapBlk, phtup, phsz);
1168
1169         /*
1170          * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
1171          * cannot shrink concurrently (but it can grow).
1172          */
1173         Assert(heapBlk % state->bs_pagesPerRange == 0);
1174         if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1175         {
1176                 /*
1177                  * If we're asked to scan what we believe to be the final range on the
1178                  * table (i.e. a range that might be partial) we need to recompute our
1179                  * idea of what the latest page is after inserting the placeholder
1180                  * tuple.  Anyone that grows the table later will update the
1181                  * placeholder tuple, so it doesn't matter that we won't scan these
1182                  * pages ourselves.  Careful: the table might have been extended
1183                  * beyond the current range, so clamp our result.
1184                  *
1185                  * Fortunately, this should occur infrequently.
1186                  */
1187                 scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1188                                                   state->bs_pagesPerRange);
1189         }
1190         else
1191         {
1192                 /* Easy case: range is known to be complete */
1193                 scanNumBlks = state->bs_pagesPerRange;
1194         }
1195
1196         /*
1197          * Execute the partial heap scan covering the heap blocks in the specified
1198          * page range, summarizing the heap tuples in it.  This scan stops just
1199          * short of brinbuildCallback creating the new index entry.
1200          *
1201          * Note that it is critical we use the "any visible" mode of
1202          * IndexBuildHeapRangeScan here: otherwise, we would miss tuples inserted
1203          * by transactions that are still in progress, among other corner cases.
1204          */
1205         state->bs_currRangeStart = heapBlk;
1206         IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
1207                                                         heapBlk, scanNumBlks,
1208                                                         brinbuildCallback, (void *) state, NULL);
1209
1210         /*
1211          * Now we update the values obtained by the scan with the placeholder
1212          * tuple.  We do this in a loop which only terminates if we're able to
1213          * update the placeholder tuple successfully; if we are not, this means
1214          * somebody else modified the placeholder tuple after we read it.
1215          */
1216         for (;;)
1217         {
1218                 BrinTuple  *newtup;
1219                 Size            newsize;
1220                 bool            didupdate;
1221                 bool            samepage;
1222
1223                 CHECK_FOR_INTERRUPTS();
1224
1225                 /*
1226                  * Update the summary tuple and try to update.
1227                  */
1228                 newtup = brin_form_tuple(state->bs_bdesc,
1229                                                                  heapBlk, state->bs_dtuple, &newsize);
1230                 samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1231                 didupdate =
1232                         brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1233                                                   state->bs_rmAccess, heapBlk, phbuf, offset,
1234                                                   phtup, phsz, newtup, newsize, samepage);
1235                 brin_free_tuple(phtup);
1236                 brin_free_tuple(newtup);
1237
1238                 /* If the update succeeded, we're done. */
1239                 if (didupdate)
1240                         break;
1241
1242                 /*
1243                  * If the update didn't work, it might be because somebody updated the
1244                  * placeholder tuple concurrently.  Extract the new version, union it
1245                  * with the values we have from the scan, and start over.  (There are
1246                  * other reasons for the update to fail, but it's simple to treat them
1247                  * the same.)
1248                  */
1249                 phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1250                                                                                  &offset, &phsz, BUFFER_LOCK_SHARE,
1251                                                                                  NULL);
1252                 /* the placeholder tuple must exist */
1253                 if (phtup == NULL)
1254                         elog(ERROR, "missing placeholder tuple");
1255                 phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1256                 LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1257
1258                 /* merge it into the tuple from the heap scan */
1259                 union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1260         }
1261
1262         ReleaseBuffer(phbuf);
1263 }
1264
1265 /*
1266  * Summarize page ranges that are not already summarized.  If pageRange is
1267  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1268  * page range containing the given heap page number is scanned.
1269  * If include_partial is true, then the partial range at the end of the table
1270  * is summarized, otherwise not.
1271  *
1272  * For each new index tuple inserted, *numSummarized (if not NULL) is
1273  * incremented; for each existing tuple, *numExisting (if not NULL) is
1274  * incremented.
1275  */
1276 static void
1277 brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1278                           bool include_partial, double *numSummarized, double *numExisting)
1279 {
1280         BrinRevmap *revmap;
1281         BrinBuildState *state = NULL;
1282         IndexInfo  *indexInfo = NULL;
1283         BlockNumber heapNumBlocks;
1284         BlockNumber pagesPerRange;
1285         Buffer          buf;
1286         BlockNumber startBlk;
1287
1288         revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1289
1290         /* determine range of pages to process */
1291         heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1292         if (pageRange == BRIN_ALL_BLOCKRANGES)
1293                 startBlk = 0;
1294         else
1295         {
1296                 startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1297                 heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1298         }
1299         if (startBlk > heapNumBlocks)
1300         {
1301                 /* Nothing to do if start point is beyond end of table */
1302                 brinRevmapTerminate(revmap);
1303                 return;
1304         }
1305
1306         /*
1307          * Scan the revmap to find unsummarized items.
1308          */
1309         buf = InvalidBuffer;
1310         for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1311         {
1312                 BrinTuple  *tup;
1313                 OffsetNumber off;
1314
1315                 /*
1316                  * Unless requested to summarize even a partial range, go away now if
1317                  * we think the next range is partial.  Caller would pass true when it
1318                  * is typically run once bulk data loading is done
1319                  * (brin_summarize_new_values), and false when it is typically the
1320                  * result of arbitrarily-scheduled maintenance command (vacuuming).
1321                  */
1322                 if (!include_partial &&
1323                         (startBlk + pagesPerRange > heapNumBlocks))
1324                         break;
1325
1326                 CHECK_FOR_INTERRUPTS();
1327
1328                 tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1329                                                                            BUFFER_LOCK_SHARE, NULL);
1330                 if (tup == NULL)
1331                 {
1332                         /* no revmap entry for this heap range. Summarize it. */
1333                         if (state == NULL)
1334                         {
1335                                 /* first time through */
1336                                 Assert(!indexInfo);
1337                                 state = initialize_brin_buildstate(index, revmap,
1338                                                                                                    pagesPerRange);
1339                                 indexInfo = BuildIndexInfo(index);
1340                         }
1341                         summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1342
1343                         /* and re-initialize state for the next range */
1344                         brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1345
1346                         if (numSummarized)
1347                                 *numSummarized += 1.0;
1348                 }
1349                 else
1350                 {
1351                         if (numExisting)
1352                                 *numExisting += 1.0;
1353                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1354                 }
1355         }
1356
1357         if (BufferIsValid(buf))
1358                 ReleaseBuffer(buf);
1359
1360         /* free resources */
1361         brinRevmapTerminate(revmap);
1362         if (state)
1363         {
1364                 terminate_brin_buildstate(state);
1365                 pfree(indexInfo);
1366         }
1367 }
1368
1369 /*
1370  * Given a deformed tuple in the build state, convert it into the on-disk
1371  * format and insert it into the index, making the revmap point to it.
1372  */
1373 static void
1374 form_and_insert_tuple(BrinBuildState *state)
1375 {
1376         BrinTuple  *tup;
1377         Size            size;
1378
1379         tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1380                                                   state->bs_dtuple, &size);
1381         brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1382                                   &state->bs_currentInsertBuf, state->bs_currRangeStart,
1383                                   tup, size);
1384         state->bs_numtuples++;
1385
1386         pfree(tup);
1387 }
1388
1389 /*
1390  * Given two deformed tuples, adjust the first one so that it's consistent
1391  * with the summary values in both.
1392  */
1393 static void
1394 union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1395 {
1396         int                     keyno;
1397         BrinMemTuple *db;
1398         MemoryContext cxt;
1399         MemoryContext oldcxt;
1400
1401         /* Use our own memory context to avoid retail pfree */
1402         cxt = AllocSetContextCreate(CurrentMemoryContext,
1403                                                                 "brin union",
1404                                                                 ALLOCSET_DEFAULT_SIZES);
1405         oldcxt = MemoryContextSwitchTo(cxt);
1406         db = brin_deform_tuple(bdesc, b, NULL);
1407         MemoryContextSwitchTo(oldcxt);
1408
1409         for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1410         {
1411                 FmgrInfo   *unionFn;
1412                 BrinValues *col_a = &a->bt_columns[keyno];
1413                 BrinValues *col_b = &db->bt_columns[keyno];
1414
1415                 unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1416                                                                         BRIN_PROCNUM_UNION);
1417                 FunctionCall3Coll(unionFn,
1418                                                   bdesc->bd_index->rd_indcollation[keyno],
1419                                                   PointerGetDatum(bdesc),
1420                                                   PointerGetDatum(col_a),
1421                                                   PointerGetDatum(col_b));
1422         }
1423
1424         MemoryContextDelete(cxt);
1425 }
1426
1427 /*
1428  * brin_vacuum_scan
1429  *              Do a complete scan of the index during VACUUM.
1430  *
1431  * This routine scans the complete index looking for uncatalogued index pages,
1432  * i.e. those that might have been lost due to a crash after index extension
1433  * and such.
1434  */
1435 static void
1436 brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1437 {
1438         bool            vacuum_fsm = false;
1439         BlockNumber blkno;
1440
1441         /*
1442          * Scan the index in physical order, and clean up any possible mess in
1443          * each page.
1444          */
1445         for (blkno = 0; blkno < RelationGetNumberOfBlocks(idxrel); blkno++)
1446         {
1447                 Buffer          buf;
1448
1449                 CHECK_FOR_INTERRUPTS();
1450
1451                 buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1452                                                                  RBM_NORMAL, strategy);
1453
1454                 vacuum_fsm |= brin_page_cleanup(idxrel, buf);
1455
1456                 ReleaseBuffer(buf);
1457         }
1458
1459         /*
1460          * If we made any change to the FSM, make sure the new info is visible all
1461          * the way to the top.
1462          */
1463         if (vacuum_fsm)
1464                 FreeSpaceMapVacuum(idxrel);
1465 }