]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hash.c
hash: Add write-ahead logging support.
[postgresql] / src / backend / access / hash / hash.c
1 /*-------------------------------------------------------------------------
2  *
3  * hash.c
4  *        Implementation of Margo Seltzer's Hashing package for postgres.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/hash/hash.c
12  *
13  * NOTES
14  *        This file contains only the public interface routines.
15  *
16  *-------------------------------------------------------------------------
17  */
18
19 #include "postgres.h"
20
21 #include "access/hash.h"
22 #include "access/hash_xlog.h"
23 #include "access/relscan.h"
24 #include "catalog/index.h"
25 #include "commands/vacuum.h"
26 #include "miscadmin.h"
27 #include "optimizer/plancat.h"
28 #include "utils/builtins.h"
29 #include "utils/index_selfuncs.h"
30 #include "utils/rel.h"
31 #include "miscadmin.h"
32
33
34 /* Working state for hashbuild and its callback */
35 typedef struct
36 {
37         HSpool     *spool;                      /* NULL if not using spooling */
38         double          indtuples;              /* # tuples accepted into index */
39 } HashBuildState;
40
41 static void hashbuildCallback(Relation index,
42                                   HeapTuple htup,
43                                   Datum *values,
44                                   bool *isnull,
45                                   bool tupleIsAlive,
46                                   void *state);
47
48
49 /*
50  * Hash handler function: return IndexAmRoutine with access method parameters
51  * and callbacks.
52  */
53 Datum
54 hashhandler(PG_FUNCTION_ARGS)
55 {
56         IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
57
58         amroutine->amstrategies = HTMaxStrategyNumber;
59         amroutine->amsupport = HASHNProcs;
60         amroutine->amcanorder = false;
61         amroutine->amcanorderbyop = false;
62         amroutine->amcanbackward = true;
63         amroutine->amcanunique = false;
64         amroutine->amcanmulticol = false;
65         amroutine->amoptionalkey = false;
66         amroutine->amsearcharray = false;
67         amroutine->amsearchnulls = false;
68         amroutine->amstorage = false;
69         amroutine->amclusterable = false;
70         amroutine->ampredlocks = false;
71         amroutine->amcanparallel = false;
72         amroutine->amkeytype = INT4OID;
73
74         amroutine->ambuild = hashbuild;
75         amroutine->ambuildempty = hashbuildempty;
76         amroutine->aminsert = hashinsert;
77         amroutine->ambulkdelete = hashbulkdelete;
78         amroutine->amvacuumcleanup = hashvacuumcleanup;
79         amroutine->amcanreturn = NULL;
80         amroutine->amcostestimate = hashcostestimate;
81         amroutine->amoptions = hashoptions;
82         amroutine->amproperty = NULL;
83         amroutine->amvalidate = hashvalidate;
84         amroutine->ambeginscan = hashbeginscan;
85         amroutine->amrescan = hashrescan;
86         amroutine->amgettuple = hashgettuple;
87         amroutine->amgetbitmap = hashgetbitmap;
88         amroutine->amendscan = hashendscan;
89         amroutine->ammarkpos = NULL;
90         amroutine->amrestrpos = NULL;
91         amroutine->amestimateparallelscan = NULL;
92         amroutine->aminitparallelscan = NULL;
93         amroutine->amparallelrescan = NULL;
94
95         PG_RETURN_POINTER(amroutine);
96 }
97
98 /*
99  *      hashbuild() -- build a new hash index.
100  */
101 IndexBuildResult *
102 hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
103 {
104         IndexBuildResult *result;
105         BlockNumber relpages;
106         double          reltuples;
107         double          allvisfrac;
108         uint32          num_buckets;
109         long            sort_threshold;
110         HashBuildState buildstate;
111
112         /*
113          * We expect to be called exactly once for any index relation. If that's
114          * not the case, big trouble's what we have.
115          */
116         if (RelationGetNumberOfBlocks(index) != 0)
117                 elog(ERROR, "index \"%s\" already contains data",
118                          RelationGetRelationName(index));
119
120         /* Estimate the number of rows currently present in the table */
121         estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
122
123         /* Initialize the hash index metadata page and initial buckets */
124         num_buckets = _hash_init(index, reltuples, MAIN_FORKNUM);
125
126         /*
127          * If we just insert the tuples into the index in scan order, then
128          * (assuming their hash codes are pretty random) there will be no locality
129          * of access to the index, and if the index is bigger than available RAM
130          * then we'll thrash horribly.  To prevent that scenario, we can sort the
131          * tuples by (expected) bucket number.  However, such a sort is useless
132          * overhead when the index does fit in RAM.  We choose to sort if the
133          * initial index size exceeds maintenance_work_mem, or the number of
134          * buffers usable for the index, whichever is less.  (Limiting by the
135          * number of buffers should reduce thrashing between PG buffers and kernel
136          * buffers, which seems useful even if no physical I/O results.  Limiting
137          * by maintenance_work_mem is useful to allow easy testing of the sort
138          * code path, and may be useful to DBAs as an additional control knob.)
139          *
140          * NOTE: this test will need adjustment if a bucket is ever different from
141          * one page.  Also, "initial index size" accounting does not include the
142          * metapage, nor the first bitmap page.
143          */
144         sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
145         if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
146                 sort_threshold = Min(sort_threshold, NBuffers);
147         else
148                 sort_threshold = Min(sort_threshold, NLocBuffer);
149
150         if (num_buckets >= (uint32) sort_threshold)
151                 buildstate.spool = _h_spoolinit(heap, index, num_buckets);
152         else
153                 buildstate.spool = NULL;
154
155         /* prepare to build the index */
156         buildstate.indtuples = 0;
157
158         /* do the heap scan */
159         reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
160                                                                    hashbuildCallback, (void *) &buildstate);
161
162         if (buildstate.spool)
163         {
164                 /* sort the tuples and insert them into the index */
165                 _h_indexbuild(buildstate.spool);
166                 _h_spooldestroy(buildstate.spool);
167         }
168
169         /*
170          * Return statistics
171          */
172         result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
173
174         result->heap_tuples = reltuples;
175         result->index_tuples = buildstate.indtuples;
176
177         return result;
178 }
179
180 /*
181  *      hashbuildempty() -- build an empty hash index in the initialization fork
182  */
183 void
184 hashbuildempty(Relation index)
185 {
186         _hash_init(index, 0, INIT_FORKNUM);
187 }
188
189 /*
190  * Per-tuple callback from IndexBuildHeapScan
191  */
192 static void
193 hashbuildCallback(Relation index,
194                                   HeapTuple htup,
195                                   Datum *values,
196                                   bool *isnull,
197                                   bool tupleIsAlive,
198                                   void *state)
199 {
200         HashBuildState *buildstate = (HashBuildState *) state;
201         Datum           index_values[1];
202         bool            index_isnull[1];
203         IndexTuple      itup;
204
205         /* convert data to a hash key; on failure, do not insert anything */
206         if (!_hash_convert_tuple(index,
207                                                          values, isnull,
208                                                          index_values, index_isnull))
209                 return;
210
211         /* Either spool the tuple for sorting, or just put it into the index */
212         if (buildstate->spool)
213                 _h_spool(buildstate->spool, &htup->t_self,
214                                  index_values, index_isnull);
215         else
216         {
217                 /* form an index tuple and point it at the heap tuple */
218                 itup = index_form_tuple(RelationGetDescr(index),
219                                                                 index_values, index_isnull);
220                 itup->t_tid = htup->t_self;
221                 _hash_doinsert(index, itup);
222                 pfree(itup);
223         }
224
225         buildstate->indtuples += 1;
226 }
227
228 /*
229  *      hashinsert() -- insert an index tuple into a hash table.
230  *
231  *      Hash on the heap tuple's key, form an index tuple with hash code.
232  *      Find the appropriate location for the new tuple, and put it there.
233  */
234 bool
235 hashinsert(Relation rel, Datum *values, bool *isnull,
236                    ItemPointer ht_ctid, Relation heapRel,
237                    IndexUniqueCheck checkUnique,
238                    IndexInfo *indexInfo)
239 {
240         Datum           index_values[1];
241         bool            index_isnull[1];
242         IndexTuple      itup;
243
244         /* convert data to a hash key; on failure, do not insert anything */
245         if (!_hash_convert_tuple(rel,
246                                                          values, isnull,
247                                                          index_values, index_isnull))
248                 return false;
249
250         /* form an index tuple and point it at the heap tuple */
251         itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
252         itup->t_tid = *ht_ctid;
253
254         _hash_doinsert(rel, itup);
255
256         pfree(itup);
257
258         return false;
259 }
260
261
262 /*
263  *      hashgettuple() -- Get the next tuple in the scan.
264  */
265 bool
266 hashgettuple(IndexScanDesc scan, ScanDirection dir)
267 {
268         HashScanOpaque so = (HashScanOpaque) scan->opaque;
269         Relation        rel = scan->indexRelation;
270         Buffer          buf;
271         Page            page;
272         OffsetNumber offnum;
273         ItemPointer current;
274         bool            res;
275
276         /* Hash indexes are always lossy since we store only the hash code */
277         scan->xs_recheck = true;
278
279         /*
280          * We hold pin but not lock on current buffer while outside the hash AM.
281          * Reacquire the read lock here.
282          */
283         if (BufferIsValid(so->hashso_curbuf))
284                 LockBuffer(so->hashso_curbuf, BUFFER_LOCK_SHARE);
285
286         /*
287          * If we've already initialized this scan, we can just advance it in the
288          * appropriate direction.  If we haven't done so yet, we call a routine to
289          * get the first item in the scan.
290          */
291         current = &(so->hashso_curpos);
292         if (ItemPointerIsValid(current))
293         {
294                 /*
295                  * An insertion into the current index page could have happened while
296                  * we didn't have read lock on it.  Re-find our position by looking
297                  * for the TID we previously returned.  (Because we hold a pin on the
298                  * primary bucket page, no deletions or splits could have occurred;
299                  * therefore we can expect that the TID still exists in the current
300                  * index page, at an offset >= where we were.)
301                  */
302                 OffsetNumber maxoffnum;
303
304                 buf = so->hashso_curbuf;
305                 Assert(BufferIsValid(buf));
306                 page = BufferGetPage(buf);
307
308                 /*
309                  * We don't need test for old snapshot here as the current buffer is
310                  * pinned, so vacuum can't clean the page.
311                  */
312                 maxoffnum = PageGetMaxOffsetNumber(page);
313                 for (offnum = ItemPointerGetOffsetNumber(current);
314                          offnum <= maxoffnum;
315                          offnum = OffsetNumberNext(offnum))
316                 {
317                         IndexTuple      itup;
318
319                         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
320                         if (ItemPointerEquals(&(so->hashso_heappos), &(itup->t_tid)))
321                                 break;
322                 }
323                 if (offnum > maxoffnum)
324                         elog(ERROR, "failed to re-find scan position within index \"%s\"",
325                                  RelationGetRelationName(rel));
326                 ItemPointerSetOffsetNumber(current, offnum);
327
328                 /*
329                  * Check to see if we should kill the previously-fetched tuple.
330                  */
331                 if (scan->kill_prior_tuple)
332                 {
333                         /*
334                          * Yes, so mark it by setting the LP_DEAD state in the item flags.
335                          */
336                         ItemIdMarkDead(PageGetItemId(page, offnum));
337
338                         /*
339                          * Since this can be redone later if needed, mark as a hint.
340                          */
341                         MarkBufferDirtyHint(buf, true);
342                 }
343
344                 /*
345                  * Now continue the scan.
346                  */
347                 res = _hash_next(scan, dir);
348         }
349         else
350                 res = _hash_first(scan, dir);
351
352         /*
353          * Skip killed tuples if asked to.
354          */
355         if (scan->ignore_killed_tuples)
356         {
357                 while (res)
358                 {
359                         offnum = ItemPointerGetOffsetNumber(current);
360                         page = BufferGetPage(so->hashso_curbuf);
361                         if (!ItemIdIsDead(PageGetItemId(page, offnum)))
362                                 break;
363                         res = _hash_next(scan, dir);
364                 }
365         }
366
367         /* Release read lock on current buffer, but keep it pinned */
368         if (BufferIsValid(so->hashso_curbuf))
369                 LockBuffer(so->hashso_curbuf, BUFFER_LOCK_UNLOCK);
370
371         /* Return current heap TID on success */
372         scan->xs_ctup.t_self = so->hashso_heappos;
373
374         return res;
375 }
376
377
378 /*
379  *      hashgetbitmap() -- get all tuples at once
380  */
381 int64
382 hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
383 {
384         HashScanOpaque so = (HashScanOpaque) scan->opaque;
385         bool            res;
386         int64           ntids = 0;
387
388         res = _hash_first(scan, ForwardScanDirection);
389
390         while (res)
391         {
392                 bool            add_tuple;
393
394                 /*
395                  * Skip killed tuples if asked to.
396                  */
397                 if (scan->ignore_killed_tuples)
398                 {
399                         Page            page;
400                         OffsetNumber offnum;
401
402                         offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
403                         page = BufferGetPage(so->hashso_curbuf);
404                         add_tuple = !ItemIdIsDead(PageGetItemId(page, offnum));
405                 }
406                 else
407                         add_tuple = true;
408
409                 /* Save tuple ID, and continue scanning */
410                 if (add_tuple)
411                 {
412                         /* Note we mark the tuple ID as requiring recheck */
413                         tbm_add_tuples(tbm, &(so->hashso_heappos), 1, true);
414                         ntids++;
415                 }
416
417                 res = _hash_next(scan, ForwardScanDirection);
418         }
419
420         return ntids;
421 }
422
423
424 /*
425  *      hashbeginscan() -- start a scan on a hash index
426  */
427 IndexScanDesc
428 hashbeginscan(Relation rel, int nkeys, int norderbys)
429 {
430         IndexScanDesc scan;
431         HashScanOpaque so;
432
433         /* no order by operators allowed */
434         Assert(norderbys == 0);
435
436         scan = RelationGetIndexScan(rel, nkeys, norderbys);
437
438         so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
439         so->hashso_curbuf = InvalidBuffer;
440         so->hashso_bucket_buf = InvalidBuffer;
441         so->hashso_split_bucket_buf = InvalidBuffer;
442         /* set position invalid (this will cause _hash_first call) */
443         ItemPointerSetInvalid(&(so->hashso_curpos));
444         ItemPointerSetInvalid(&(so->hashso_heappos));
445
446         so->hashso_buc_populated = false;
447         so->hashso_buc_split = false;
448
449         scan->opaque = so;
450
451         return scan;
452 }
453
454 /*
455  *      hashrescan() -- rescan an index relation
456  */
457 void
458 hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
459                    ScanKey orderbys, int norderbys)
460 {
461         HashScanOpaque so = (HashScanOpaque) scan->opaque;
462         Relation        rel = scan->indexRelation;
463
464         _hash_dropscanbuf(rel, so);
465
466         /* set position invalid (this will cause _hash_first call) */
467         ItemPointerSetInvalid(&(so->hashso_curpos));
468         ItemPointerSetInvalid(&(so->hashso_heappos));
469
470         /* Update scan key, if a new one is given */
471         if (scankey && scan->numberOfKeys > 0)
472         {
473                 memmove(scan->keyData,
474                                 scankey,
475                                 scan->numberOfKeys * sizeof(ScanKeyData));
476         }
477
478         so->hashso_buc_populated = false;
479         so->hashso_buc_split = false;
480 }
481
482 /*
483  *      hashendscan() -- close down a scan
484  */
485 void
486 hashendscan(IndexScanDesc scan)
487 {
488         HashScanOpaque so = (HashScanOpaque) scan->opaque;
489         Relation        rel = scan->indexRelation;
490
491         _hash_dropscanbuf(rel, so);
492
493         pfree(so);
494         scan->opaque = NULL;
495 }
496
497 /*
498  * Bulk deletion of all index entries pointing to a set of heap tuples.
499  * The set of target tuples is specified via a callback routine that tells
500  * whether any given heap tuple (identified by ItemPointer) is being deleted.
501  *
502  * This function also deletes the tuples that are moved by split to other
503  * bucket.
504  *
505  * Result: a palloc'd struct containing statistical info for VACUUM displays.
506  */
507 IndexBulkDeleteResult *
508 hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
509                            IndexBulkDeleteCallback callback, void *callback_state)
510 {
511         Relation        rel = info->index;
512         double          tuples_removed;
513         double          num_index_tuples;
514         double          orig_ntuples;
515         Bucket          orig_maxbucket;
516         Bucket          cur_maxbucket;
517         Bucket          cur_bucket;
518         Buffer          metabuf = InvalidBuffer;
519         HashMetaPage metap;
520         HashMetaPage cachedmetap;
521
522         tuples_removed = 0;
523         num_index_tuples = 0;
524
525         /*
526          * We need a copy of the metapage so that we can use its hashm_spares[]
527          * values to compute bucket page addresses, but a cached copy should be
528          * good enough.  (If not, we'll detect that further down and refresh the
529          * cache as necessary.)
530          */
531         cachedmetap = _hash_getcachedmetap(rel, &metabuf, false);
532         Assert(cachedmetap != NULL);
533
534         orig_maxbucket = cachedmetap->hashm_maxbucket;
535         orig_ntuples = cachedmetap->hashm_ntuples;
536
537         /* Scan the buckets that we know exist */
538         cur_bucket = 0;
539         cur_maxbucket = orig_maxbucket;
540
541 loop_top:
542         while (cur_bucket <= cur_maxbucket)
543         {
544                 BlockNumber bucket_blkno;
545                 BlockNumber blkno;
546                 Buffer          bucket_buf;
547                 Buffer          buf;
548                 HashPageOpaque bucket_opaque;
549                 Page            page;
550                 bool            split_cleanup = false;
551
552                 /* Get address of bucket's start page */
553                 bucket_blkno = BUCKET_TO_BLKNO(cachedmetap, cur_bucket);
554
555                 blkno = bucket_blkno;
556
557                 /*
558                  * We need to acquire a cleanup lock on the primary bucket page to out
559                  * wait concurrent scans before deleting the dead tuples.
560                  */
561                 buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
562                 LockBufferForCleanup(buf);
563                 _hash_checkpage(rel, buf, LH_BUCKET_PAGE);
564
565                 page = BufferGetPage(buf);
566                 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
567
568                 /*
569                  * If the bucket contains tuples that are moved by split, then we need
570                  * to delete such tuples.  We can't delete such tuples if the split
571                  * operation on bucket is not finished as those are needed by scans.
572                  */
573                 if (!H_BUCKET_BEING_SPLIT(bucket_opaque) &&
574                         H_NEEDS_SPLIT_CLEANUP(bucket_opaque))
575                 {
576                         split_cleanup = true;
577
578                         /*
579                          * This bucket might have been split since we last held a lock on
580                          * the metapage.  If so, hashm_maxbucket, hashm_highmask and
581                          * hashm_lowmask might be old enough to cause us to fail to remove
582                          * tuples left behind by the most recent split.  To prevent that,
583                          * now that the primary page of the target bucket has been locked
584                          * (and thus can't be further split), check whether we need to
585                          * update our cached metapage data.
586                          *
587                          * NB: The check for InvalidBlockNumber is only needed for
588                          * on-disk compatibility with indexes created before we started
589                          * storing hashm_maxbucket in the primary page's hasho_prevblkno.
590                          */
591                         if (bucket_opaque->hasho_prevblkno != InvalidBlockNumber &&
592                                 bucket_opaque->hasho_prevblkno > cachedmetap->hashm_maxbucket)
593                         {
594                                 cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
595                                 Assert(cachedmetap != NULL);
596                         }
597                 }
598
599                 bucket_buf = buf;
600
601                 hashbucketcleanup(rel, cur_bucket, bucket_buf, blkno, info->strategy,
602                                                   cachedmetap->hashm_maxbucket,
603                                                   cachedmetap->hashm_highmask,
604                                                   cachedmetap->hashm_lowmask, &tuples_removed,
605                                                   &num_index_tuples, split_cleanup,
606                                                   callback, callback_state);
607
608                 _hash_dropbuf(rel, bucket_buf);
609
610                 /* Advance to next bucket */
611                 cur_bucket++;
612         }
613
614         if (BufferIsInvalid(metabuf))
615                 metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
616
617         /* Write-lock metapage and check for split since we started */
618         LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
619         metap = HashPageGetMeta(BufferGetPage(metabuf));
620
621         if (cur_maxbucket != metap->hashm_maxbucket)
622         {
623                 /* There's been a split, so process the additional bucket(s) */
624                 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
625                 cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
626                 Assert(cachedmetap != NULL);
627                 cur_maxbucket = cachedmetap->hashm_maxbucket;
628                 goto loop_top;
629         }
630
631         /* Okay, we're really done.  Update tuple count in metapage. */
632         START_CRIT_SECTION();
633
634         if (orig_maxbucket == metap->hashm_maxbucket &&
635                 orig_ntuples == metap->hashm_ntuples)
636         {
637                 /*
638                  * No one has split or inserted anything since start of scan, so
639                  * believe our count as gospel.
640                  */
641                 metap->hashm_ntuples = num_index_tuples;
642         }
643         else
644         {
645                 /*
646                  * Otherwise, our count is untrustworthy since we may have
647                  * double-scanned tuples in split buckets.  Proceed by dead-reckoning.
648                  * (Note: we still return estimated_count = false, because using this
649                  * count is better than not updating reltuples at all.)
650                  */
651                 if (metap->hashm_ntuples > tuples_removed)
652                         metap->hashm_ntuples -= tuples_removed;
653                 else
654                         metap->hashm_ntuples = 0;
655                 num_index_tuples = metap->hashm_ntuples;
656         }
657
658         MarkBufferDirty(metabuf);
659
660         /* XLOG stuff */
661         if (RelationNeedsWAL(rel))
662         {
663                 xl_hash_update_meta_page xlrec;
664                 XLogRecPtr      recptr;
665
666                 xlrec.ntuples = metap->hashm_ntuples;
667
668                 XLogBeginInsert();
669                 XLogRegisterData((char *) &xlrec, sizeof(SizeOfHashUpdateMetaPage));
670
671                 XLogRegisterBuffer(0, metabuf, REGBUF_STANDARD);
672
673                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE);
674                 PageSetLSN(BufferGetPage(metabuf), recptr);
675         }
676
677         END_CRIT_SECTION();
678
679         _hash_relbuf(rel, metabuf);
680
681         /* return statistics */
682         if (stats == NULL)
683                 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
684         stats->estimated_count = false;
685         stats->num_index_tuples = num_index_tuples;
686         stats->tuples_removed += tuples_removed;
687         /* hashvacuumcleanup will fill in num_pages */
688
689         return stats;
690 }
691
692 /*
693  * Post-VACUUM cleanup.
694  *
695  * Result: a palloc'd struct containing statistical info for VACUUM displays.
696  */
697 IndexBulkDeleteResult *
698 hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
699 {
700         Relation        rel = info->index;
701         BlockNumber num_pages;
702
703         /* If hashbulkdelete wasn't called, return NULL signifying no change */
704         /* Note: this covers the analyze_only case too */
705         if (stats == NULL)
706                 return NULL;
707
708         /* update statistics */
709         num_pages = RelationGetNumberOfBlocks(rel);
710         stats->num_pages = num_pages;
711
712         return stats;
713 }
714
715 /*
716  * Helper function to perform deletion of index entries from a bucket.
717  *
718  * This function expects that the caller has acquired a cleanup lock on the
719  * primary bucket page, and will return with a write lock again held on the
720  * primary bucket page.  The lock won't necessarily be held continuously,
721  * though, because we'll release it when visiting overflow pages.
722  *
723  * It would be very bad if this function cleaned a page while some other
724  * backend was in the midst of scanning it, because hashgettuple assumes
725  * that the next valid TID will be greater than or equal to the current
726  * valid TID.  There can't be any concurrent scans in progress when we first
727  * enter this function because of the cleanup lock we hold on the primary
728  * bucket page, but as soon as we release that lock, there might be.  We
729  * handle that by conspiring to prevent those scans from passing our cleanup
730  * scan.  To do that, we lock the next page in the bucket chain before
731  * releasing the lock on the previous page.  (This type of lock chaining is
732  * not ideal, so we might want to look for a better solution at some point.)
733  *
734  * We need to retain a pin on the primary bucket to ensure that no concurrent
735  * split can start.
736  */
737 void
738 hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
739                                   BlockNumber bucket_blkno, BufferAccessStrategy bstrategy,
740                                   uint32 maxbucket, uint32 highmask, uint32 lowmask,
741                                   double *tuples_removed, double *num_index_tuples,
742                                   bool split_cleanup,
743                                   IndexBulkDeleteCallback callback, void *callback_state)
744 {
745         BlockNumber blkno;
746         Buffer          buf;
747         Bucket new_bucket PG_USED_FOR_ASSERTS_ONLY = InvalidBucket;
748         bool            bucket_dirty = false;
749
750         blkno = bucket_blkno;
751         buf = bucket_buf;
752
753         if (split_cleanup)
754                 new_bucket = _hash_get_newbucket_from_oldbucket(rel, cur_bucket,
755                                                                                                                 lowmask, maxbucket);
756
757         /* Scan each page in bucket */
758         for (;;)
759         {
760                 HashPageOpaque opaque;
761                 OffsetNumber offno;
762                 OffsetNumber maxoffno;
763                 Buffer          next_buf;
764                 Page            page;
765                 OffsetNumber deletable[MaxOffsetNumber];
766                 int                     ndeletable = 0;
767                 bool            retain_pin = false;
768
769                 vacuum_delay_point();
770
771                 page = BufferGetPage(buf);
772                 opaque = (HashPageOpaque) PageGetSpecialPointer(page);
773
774                 /* Scan each tuple in page */
775                 maxoffno = PageGetMaxOffsetNumber(page);
776                 for (offno = FirstOffsetNumber;
777                          offno <= maxoffno;
778                          offno = OffsetNumberNext(offno))
779                 {
780                         ItemPointer htup;
781                         IndexTuple      itup;
782                         Bucket          bucket;
783                         bool            kill_tuple = false;
784
785                         itup = (IndexTuple) PageGetItem(page,
786                                                                                         PageGetItemId(page, offno));
787                         htup = &(itup->t_tid);
788
789                         /*
790                          * To remove the dead tuples, we strictly want to rely on results
791                          * of callback function.  refer btvacuumpage for detailed reason.
792                          */
793                         if (callback && callback(htup, callback_state))
794                         {
795                                 kill_tuple = true;
796                                 if (tuples_removed)
797                                         *tuples_removed += 1;
798                         }
799                         else if (split_cleanup)
800                         {
801                                 /* delete the tuples that are moved by split. */
802                                 bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
803                                                                                           maxbucket,
804                                                                                           highmask,
805                                                                                           lowmask);
806                                 /* mark the item for deletion */
807                                 if (bucket != cur_bucket)
808                                 {
809                                         /*
810                                          * We expect tuples to either belong to curent bucket or
811                                          * new_bucket.  This is ensured because we don't allow
812                                          * further splits from bucket that contains garbage. See
813                                          * comments in _hash_expandtable.
814                                          */
815                                         Assert(bucket == new_bucket);
816                                         kill_tuple = true;
817                                 }
818                         }
819
820                         if (kill_tuple)
821                         {
822                                 /* mark the item for deletion */
823                                 deletable[ndeletable++] = offno;
824                         }
825                         else
826                         {
827                                 /* we're keeping it, so count it */
828                                 if (num_index_tuples)
829                                         *num_index_tuples += 1;
830                         }
831                 }
832
833                 /* retain the pin on primary bucket page till end of bucket scan */
834                 if (blkno == bucket_blkno)
835                         retain_pin = true;
836                 else
837                         retain_pin = false;
838
839                 blkno = opaque->hasho_nextblkno;
840
841                 /*
842                  * Apply deletions, advance to next page and write page if needed.
843                  */
844                 if (ndeletable > 0)
845                 {
846                         /* No ereport(ERROR) until changes are logged */
847                         START_CRIT_SECTION();
848
849                         PageIndexMultiDelete(page, deletable, ndeletable);
850                         bucket_dirty = true;
851                         MarkBufferDirty(buf);
852
853                         /* XLOG stuff */
854                         if (RelationNeedsWAL(rel))
855                         {
856                                 xl_hash_delete xlrec;
857                                 XLogRecPtr      recptr;
858
859                                 xlrec.is_primary_bucket_page = (buf == bucket_buf) ? true : false;
860
861                                 XLogBeginInsert();
862                                 XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
863
864                                 /*
865                                  * bucket buffer needs to be registered to ensure that we can
866                                  * acquire a cleanup lock on it during replay.
867                                  */
868                                 if (!xlrec.is_primary_bucket_page)
869                                         XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
870
871                                 XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
872                                 XLogRegisterBufData(1, (char *) deletable,
873                                                                         ndeletable * sizeof(OffsetNumber));
874
875                                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_DELETE);
876                                 PageSetLSN(BufferGetPage(buf), recptr);
877                         }
878
879                         END_CRIT_SECTION();
880                 }
881
882                 /* bail out if there are no more pages to scan. */
883                 if (!BlockNumberIsValid(blkno))
884                         break;
885
886                 next_buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
887                                                                                           LH_OVERFLOW_PAGE,
888                                                                                           bstrategy);
889
890                 /*
891                  * release the lock on previous page after acquiring the lock on next
892                  * page
893                  */
894                 if (retain_pin)
895                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
896                 else
897                         _hash_relbuf(rel, buf);
898
899                 buf = next_buf;
900         }
901
902         /*
903          * lock the bucket page to clear the garbage flag and squeeze the bucket.
904          * if the current buffer is same as bucket buffer, then we already have
905          * lock on bucket page.
906          */
907         if (buf != bucket_buf)
908         {
909                 _hash_relbuf(rel, buf);
910                 LockBuffer(bucket_buf, BUFFER_LOCK_EXCLUSIVE);
911         }
912
913         /*
914          * Clear the garbage flag from bucket after deleting the tuples that are
915          * moved by split.  We purposefully clear the flag before squeeze bucket,
916          * so that after restart, vacuum shouldn't again try to delete the moved
917          * by split tuples.
918          */
919         if (split_cleanup)
920         {
921                 HashPageOpaque bucket_opaque;
922                 Page            page;
923
924                 page = BufferGetPage(bucket_buf);
925                 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
926
927                 /* No ereport(ERROR) until changes are logged */
928                 START_CRIT_SECTION();
929
930                 bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
931                 MarkBufferDirty(bucket_buf);
932
933                 /* XLOG stuff */
934                 if (RelationNeedsWAL(rel))
935                 {
936                         XLogRecPtr      recptr;
937
938                         XLogBeginInsert();
939                         XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
940
941                         recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_CLEANUP);
942                         PageSetLSN(page, recptr);
943                 }
944
945                 END_CRIT_SECTION();
946         }
947
948         /*
949          * If we have deleted anything, try to compact free space.  For squeezing
950          * the bucket, we must have a cleanup lock, else it can impact the
951          * ordering of tuples for a scan that has started before it.
952          */
953         if (bucket_dirty && IsBufferCleanupOK(bucket_buf))
954                 _hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
955                                                         bstrategy);
956         else
957                 LockBuffer(bucket_buf, BUFFER_LOCK_UNLOCK);
958 }