]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hash.c
Remove _hash_wrtbuf() in favor of calling MarkBufferDirty().
[postgresql] / src / backend / access / hash / hash.c
1 /*-------------------------------------------------------------------------
2  *
3  * hash.c
4  *        Implementation of Margo Seltzer's Hashing package for postgres.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/hash/hash.c
12  *
13  * NOTES
14  *        This file contains only the public interface routines.
15  *
16  *-------------------------------------------------------------------------
17  */
18
19 #include "postgres.h"
20
21 #include "access/hash.h"
22 #include "access/hash_xlog.h"
23 #include "access/relscan.h"
24 #include "catalog/index.h"
25 #include "commands/vacuum.h"
26 #include "miscadmin.h"
27 #include "optimizer/plancat.h"
28 #include "utils/index_selfuncs.h"
29 #include "utils/rel.h"
30
31
32 /* Working state for hashbuild and its callback */
33 typedef struct
34 {
35         HSpool     *spool;                      /* NULL if not using spooling */
36         double          indtuples;              /* # tuples accepted into index */
37 } HashBuildState;
38
39 static void hashbuildCallback(Relation index,
40                                   HeapTuple htup,
41                                   Datum *values,
42                                   bool *isnull,
43                                   bool tupleIsAlive,
44                                   void *state);
45
46
47 /*
48  * Hash handler function: return IndexAmRoutine with access method parameters
49  * and callbacks.
50  */
51 Datum
52 hashhandler(PG_FUNCTION_ARGS)
53 {
54         IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
55
56         amroutine->amstrategies = HTMaxStrategyNumber;
57         amroutine->amsupport = HASHNProcs;
58         amroutine->amcanorder = false;
59         amroutine->amcanorderbyop = false;
60         amroutine->amcanbackward = true;
61         amroutine->amcanunique = false;
62         amroutine->amcanmulticol = false;
63         amroutine->amoptionalkey = false;
64         amroutine->amsearcharray = false;
65         amroutine->amsearchnulls = false;
66         amroutine->amstorage = false;
67         amroutine->amclusterable = false;
68         amroutine->ampredlocks = false;
69         amroutine->amkeytype = INT4OID;
70
71         amroutine->ambuild = hashbuild;
72         amroutine->ambuildempty = hashbuildempty;
73         amroutine->aminsert = hashinsert;
74         amroutine->ambulkdelete = hashbulkdelete;
75         amroutine->amvacuumcleanup = hashvacuumcleanup;
76         amroutine->amcanreturn = NULL;
77         amroutine->amcostestimate = hashcostestimate;
78         amroutine->amoptions = hashoptions;
79         amroutine->amproperty = NULL;
80         amroutine->amvalidate = hashvalidate;
81         amroutine->ambeginscan = hashbeginscan;
82         amroutine->amrescan = hashrescan;
83         amroutine->amgettuple = hashgettuple;
84         amroutine->amgetbitmap = hashgetbitmap;
85         amroutine->amendscan = hashendscan;
86         amroutine->ammarkpos = NULL;
87         amroutine->amrestrpos = NULL;
88
89         PG_RETURN_POINTER(amroutine);
90 }
91
92 /*
93  *      hashbuild() -- build a new hash index.
94  */
95 IndexBuildResult *
96 hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
97 {
98         IndexBuildResult *result;
99         BlockNumber relpages;
100         double          reltuples;
101         double          allvisfrac;
102         uint32          num_buckets;
103         long            sort_threshold;
104         HashBuildState buildstate;
105
106         /*
107          * We expect to be called exactly once for any index relation. If that's
108          * not the case, big trouble's what we have.
109          */
110         if (RelationGetNumberOfBlocks(index) != 0)
111                 elog(ERROR, "index \"%s\" already contains data",
112                          RelationGetRelationName(index));
113
114         /* Estimate the number of rows currently present in the table */
115         estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
116
117         /* Initialize the hash index metadata page and initial buckets */
118         num_buckets = _hash_metapinit(index, reltuples, MAIN_FORKNUM);
119
120         /*
121          * If we just insert the tuples into the index in scan order, then
122          * (assuming their hash codes are pretty random) there will be no locality
123          * of access to the index, and if the index is bigger than available RAM
124          * then we'll thrash horribly.  To prevent that scenario, we can sort the
125          * tuples by (expected) bucket number.  However, such a sort is useless
126          * overhead when the index does fit in RAM.  We choose to sort if the
127          * initial index size exceeds maintenance_work_mem, or the number of
128          * buffers usable for the index, whichever is less.  (Limiting by the
129          * number of buffers should reduce thrashing between PG buffers and kernel
130          * buffers, which seems useful even if no physical I/O results.  Limiting
131          * by maintenance_work_mem is useful to allow easy testing of the sort
132          * code path, and may be useful to DBAs as an additional control knob.)
133          *
134          * NOTE: this test will need adjustment if a bucket is ever different from
135          * one page.  Also, "initial index size" accounting does not include the
136          * metapage, nor the first bitmap page.
137          */
138         sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
139         if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
140                 sort_threshold = Min(sort_threshold, NBuffers);
141         else
142                 sort_threshold = Min(sort_threshold, NLocBuffer);
143
144         if (num_buckets >= (uint32) sort_threshold)
145                 buildstate.spool = _h_spoolinit(heap, index, num_buckets);
146         else
147                 buildstate.spool = NULL;
148
149         /* prepare to build the index */
150         buildstate.indtuples = 0;
151
152         /* do the heap scan */
153         reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
154                                                                    hashbuildCallback, (void *) &buildstate);
155
156         if (buildstate.spool)
157         {
158                 /* sort the tuples and insert them into the index */
159                 _h_indexbuild(buildstate.spool);
160                 _h_spooldestroy(buildstate.spool);
161         }
162
163         /*
164          * Return statistics
165          */
166         result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
167
168         result->heap_tuples = reltuples;
169         result->index_tuples = buildstate.indtuples;
170
171         return result;
172 }
173
174 /*
175  *      hashbuildempty() -- build an empty hash index in the initialization fork
176  */
177 void
178 hashbuildempty(Relation index)
179 {
180         _hash_metapinit(index, 0, INIT_FORKNUM);
181 }
182
183 /*
184  * Per-tuple callback from IndexBuildHeapScan
185  */
186 static void
187 hashbuildCallback(Relation index,
188                                   HeapTuple htup,
189                                   Datum *values,
190                                   bool *isnull,
191                                   bool tupleIsAlive,
192                                   void *state)
193 {
194         HashBuildState *buildstate = (HashBuildState *) state;
195         Datum           index_values[1];
196         bool            index_isnull[1];
197         IndexTuple      itup;
198
199         /* convert data to a hash key; on failure, do not insert anything */
200         if (!_hash_convert_tuple(index,
201                                                          values, isnull,
202                                                          index_values, index_isnull))
203                 return;
204
205         /* Either spool the tuple for sorting, or just put it into the index */
206         if (buildstate->spool)
207                 _h_spool(buildstate->spool, &htup->t_self,
208                                  index_values, index_isnull);
209         else
210         {
211                 /* form an index tuple and point it at the heap tuple */
212                 itup = index_form_tuple(RelationGetDescr(index),
213                                                                 index_values, index_isnull);
214                 itup->t_tid = htup->t_self;
215                 _hash_doinsert(index, itup);
216                 pfree(itup);
217         }
218
219         buildstate->indtuples += 1;
220 }
221
222 /*
223  *      hashinsert() -- insert an index tuple into a hash table.
224  *
225  *      Hash on the heap tuple's key, form an index tuple with hash code.
226  *      Find the appropriate location for the new tuple, and put it there.
227  */
228 bool
229 hashinsert(Relation rel, Datum *values, bool *isnull,
230                    ItemPointer ht_ctid, Relation heapRel,
231                    IndexUniqueCheck checkUnique)
232 {
233         Datum           index_values[1];
234         bool            index_isnull[1];
235         IndexTuple      itup;
236
237         /* convert data to a hash key; on failure, do not insert anything */
238         if (!_hash_convert_tuple(rel,
239                                                          values, isnull,
240                                                          index_values, index_isnull))
241                 return false;
242
243         /* form an index tuple and point it at the heap tuple */
244         itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
245         itup->t_tid = *ht_ctid;
246
247         _hash_doinsert(rel, itup);
248
249         pfree(itup);
250
251         return false;
252 }
253
254
255 /*
256  *      hashgettuple() -- Get the next tuple in the scan.
257  */
258 bool
259 hashgettuple(IndexScanDesc scan, ScanDirection dir)
260 {
261         HashScanOpaque so = (HashScanOpaque) scan->opaque;
262         Relation        rel = scan->indexRelation;
263         Buffer          buf;
264         Page            page;
265         OffsetNumber offnum;
266         ItemPointer current;
267         bool            res;
268
269         /* Hash indexes are always lossy since we store only the hash code */
270         scan->xs_recheck = true;
271
272         /*
273          * We hold pin but not lock on current buffer while outside the hash AM.
274          * Reacquire the read lock here.
275          */
276         if (BufferIsValid(so->hashso_curbuf))
277                 _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
278
279         /*
280          * If we've already initialized this scan, we can just advance it in the
281          * appropriate direction.  If we haven't done so yet, we call a routine to
282          * get the first item in the scan.
283          */
284         current = &(so->hashso_curpos);
285         if (ItemPointerIsValid(current))
286         {
287                 /*
288                  * An insertion into the current index page could have happened while
289                  * we didn't have read lock on it.  Re-find our position by looking
290                  * for the TID we previously returned.  (Because we hold a pin on the
291                  * primary bucket page, no deletions or splits could have occurred;
292                  * therefore we can expect that the TID still exists in the current
293                  * index page, at an offset >= where we were.)
294                  */
295                 OffsetNumber maxoffnum;
296
297                 buf = so->hashso_curbuf;
298                 Assert(BufferIsValid(buf));
299                 page = BufferGetPage(buf);
300                 maxoffnum = PageGetMaxOffsetNumber(page);
301                 for (offnum = ItemPointerGetOffsetNumber(current);
302                          offnum <= maxoffnum;
303                          offnum = OffsetNumberNext(offnum))
304                 {
305                         IndexTuple      itup;
306
307                         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
308                         if (ItemPointerEquals(&(so->hashso_heappos), &(itup->t_tid)))
309                                 break;
310                 }
311                 if (offnum > maxoffnum)
312                         elog(ERROR, "failed to re-find scan position within index \"%s\"",
313                                  RelationGetRelationName(rel));
314                 ItemPointerSetOffsetNumber(current, offnum);
315
316                 /*
317                  * Check to see if we should kill the previously-fetched tuple.
318                  */
319                 if (scan->kill_prior_tuple)
320                 {
321                         /*
322                          * Yes, so mark it by setting the LP_DEAD state in the item flags.
323                          */
324                         ItemIdMarkDead(PageGetItemId(page, offnum));
325
326                         /*
327                          * Since this can be redone later if needed, mark as a hint.
328                          */
329                         MarkBufferDirtyHint(buf, true);
330                 }
331
332                 /*
333                  * Now continue the scan.
334                  */
335                 res = _hash_next(scan, dir);
336         }
337         else
338                 res = _hash_first(scan, dir);
339
340         /*
341          * Skip killed tuples if asked to.
342          */
343         if (scan->ignore_killed_tuples)
344         {
345                 while (res)
346                 {
347                         offnum = ItemPointerGetOffsetNumber(current);
348                         page = BufferGetPage(so->hashso_curbuf);
349                         if (!ItemIdIsDead(PageGetItemId(page, offnum)))
350                                 break;
351                         res = _hash_next(scan, dir);
352                 }
353         }
354
355         /* Release read lock on current buffer, but keep it pinned */
356         if (BufferIsValid(so->hashso_curbuf))
357                 _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
358
359         /* Return current heap TID on success */
360         scan->xs_ctup.t_self = so->hashso_heappos;
361
362         return res;
363 }
364
365
366 /*
367  *      hashgetbitmap() -- get all tuples at once
368  */
369 int64
370 hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
371 {
372         HashScanOpaque so = (HashScanOpaque) scan->opaque;
373         bool            res;
374         int64           ntids = 0;
375
376         res = _hash_first(scan, ForwardScanDirection);
377
378         while (res)
379         {
380                 bool            add_tuple;
381
382                 /*
383                  * Skip killed tuples if asked to.
384                  */
385                 if (scan->ignore_killed_tuples)
386                 {
387                         Page            page;
388                         OffsetNumber offnum;
389
390                         offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
391                         page = BufferGetPage(so->hashso_curbuf);
392                         add_tuple = !ItemIdIsDead(PageGetItemId(page, offnum));
393                 }
394                 else
395                         add_tuple = true;
396
397                 /* Save tuple ID, and continue scanning */
398                 if (add_tuple)
399                 {
400                         /* Note we mark the tuple ID as requiring recheck */
401                         tbm_add_tuples(tbm, &(so->hashso_heappos), 1, true);
402                         ntids++;
403                 }
404
405                 res = _hash_next(scan, ForwardScanDirection);
406         }
407
408         return ntids;
409 }
410
411
412 /*
413  *      hashbeginscan() -- start a scan on a hash index
414  */
415 IndexScanDesc
416 hashbeginscan(Relation rel, int nkeys, int norderbys)
417 {
418         IndexScanDesc scan;
419         HashScanOpaque so;
420
421         /* no order by operators allowed */
422         Assert(norderbys == 0);
423
424         scan = RelationGetIndexScan(rel, nkeys, norderbys);
425
426         so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
427         so->hashso_curbuf = InvalidBuffer;
428         so->hashso_bucket_buf = InvalidBuffer;
429         so->hashso_split_bucket_buf = InvalidBuffer;
430         /* set position invalid (this will cause _hash_first call) */
431         ItemPointerSetInvalid(&(so->hashso_curpos));
432         ItemPointerSetInvalid(&(so->hashso_heappos));
433
434         so->hashso_buc_populated = false;
435         so->hashso_buc_split = false;
436
437         scan->opaque = so;
438
439         return scan;
440 }
441
442 /*
443  *      hashrescan() -- rescan an index relation
444  */
445 void
446 hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
447                    ScanKey orderbys, int norderbys)
448 {
449         HashScanOpaque so = (HashScanOpaque) scan->opaque;
450         Relation        rel = scan->indexRelation;
451
452         _hash_dropscanbuf(rel, so);
453
454         /* set position invalid (this will cause _hash_first call) */
455         ItemPointerSetInvalid(&(so->hashso_curpos));
456         ItemPointerSetInvalid(&(so->hashso_heappos));
457
458         /* Update scan key, if a new one is given */
459         if (scankey && scan->numberOfKeys > 0)
460         {
461                 memmove(scan->keyData,
462                                 scankey,
463                                 scan->numberOfKeys * sizeof(ScanKeyData));
464         }
465
466         so->hashso_buc_populated = false;
467         so->hashso_buc_split = false;
468 }
469
470 /*
471  *      hashendscan() -- close down a scan
472  */
473 void
474 hashendscan(IndexScanDesc scan)
475 {
476         HashScanOpaque so = (HashScanOpaque) scan->opaque;
477         Relation        rel = scan->indexRelation;
478
479         _hash_dropscanbuf(rel, so);
480
481         pfree(so);
482         scan->opaque = NULL;
483 }
484
485 /*
486  * Bulk deletion of all index entries pointing to a set of heap tuples.
487  * The set of target tuples is specified via a callback routine that tells
488  * whether any given heap tuple (identified by ItemPointer) is being deleted.
489  *
490  * This function also deletes the tuples that are moved by split to other
491  * bucket.
492  *
493  * Result: a palloc'd struct containing statistical info for VACUUM displays.
494  */
495 IndexBulkDeleteResult *
496 hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
497                            IndexBulkDeleteCallback callback, void *callback_state)
498 {
499         Relation        rel = info->index;
500         double          tuples_removed;
501         double          num_index_tuples;
502         double          orig_ntuples;
503         Bucket          orig_maxbucket;
504         Bucket          cur_maxbucket;
505         Bucket          cur_bucket;
506         Buffer          metabuf;
507         HashMetaPage metap;
508         HashMetaPageData local_metapage;
509
510         tuples_removed = 0;
511         num_index_tuples = 0;
512
513         /*
514          * Read the metapage to fetch original bucket and tuple counts.  Also, we
515          * keep a copy of the last-seen metapage so that we can use its
516          * hashm_spares[] values to compute bucket page addresses.  This is a bit
517          * hokey but perfectly safe, since the interesting entries in the spares
518          * array cannot change under us; and it beats rereading the metapage for
519          * each bucket.
520          */
521         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
522         metap = HashPageGetMeta(BufferGetPage(metabuf));
523         orig_maxbucket = metap->hashm_maxbucket;
524         orig_ntuples = metap->hashm_ntuples;
525         memcpy(&local_metapage, metap, sizeof(local_metapage));
526         /* release the lock, but keep pin */
527         _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
528
529         /* Scan the buckets that we know exist */
530         cur_bucket = 0;
531         cur_maxbucket = orig_maxbucket;
532
533 loop_top:
534         while (cur_bucket <= cur_maxbucket)
535         {
536                 BlockNumber bucket_blkno;
537                 BlockNumber blkno;
538                 Buffer          bucket_buf;
539                 Buffer          buf;
540                 HashPageOpaque bucket_opaque;
541                 Page            page;
542                 bool            split_cleanup = false;
543
544                 /* Get address of bucket's start page */
545                 bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
546
547                 blkno = bucket_blkno;
548
549                 /*
550                  * We need to acquire a cleanup lock on the primary bucket page to out
551                  * wait concurrent scans before deleting the dead tuples.
552                  */
553                 buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
554                 LockBufferForCleanup(buf);
555                 _hash_checkpage(rel, buf, LH_BUCKET_PAGE);
556
557                 page = BufferGetPage(buf);
558                 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
559
560                 /*
561                  * If the bucket contains tuples that are moved by split, then we need
562                  * to delete such tuples.  We can't delete such tuples if the split
563                  * operation on bucket is not finished as those are needed by scans.
564                  */
565                 if (!H_BUCKET_BEING_SPLIT(bucket_opaque) &&
566                         H_NEEDS_SPLIT_CLEANUP(bucket_opaque))
567                 {
568                         split_cleanup = true;
569
570                         /*
571                          * This bucket might have been split since we last held a lock on
572                          * the metapage.  If so, hashm_maxbucket, hashm_highmask and
573                          * hashm_lowmask might be old enough to cause us to fail to remove
574                          * tuples left behind by the most recent split.  To prevent that,
575                          * now that the primary page of the target bucket has been locked
576                          * (and thus can't be further split), update our cached metapage
577                          * data.
578                          */
579                         _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
580                         memcpy(&local_metapage, metap, sizeof(local_metapage));
581                         _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
582                 }
583
584                 bucket_buf = buf;
585
586                 hashbucketcleanup(rel, cur_bucket, bucket_buf, blkno, info->strategy,
587                                                   local_metapage.hashm_maxbucket,
588                                                   local_metapage.hashm_highmask,
589                                                   local_metapage.hashm_lowmask, &tuples_removed,
590                                                   &num_index_tuples, split_cleanup,
591                                                   callback, callback_state);
592
593                 _hash_dropbuf(rel, bucket_buf);
594
595                 /* Advance to next bucket */
596                 cur_bucket++;
597         }
598
599         /* Write-lock metapage and check for split since we started */
600         _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
601         metap = HashPageGetMeta(BufferGetPage(metabuf));
602
603         if (cur_maxbucket != metap->hashm_maxbucket)
604         {
605                 /* There's been a split, so process the additional bucket(s) */
606                 cur_maxbucket = metap->hashm_maxbucket;
607                 memcpy(&local_metapage, metap, sizeof(local_metapage));
608                 _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
609                 goto loop_top;
610         }
611
612         /* Okay, we're really done.  Update tuple count in metapage. */
613
614         if (orig_maxbucket == metap->hashm_maxbucket &&
615                 orig_ntuples == metap->hashm_ntuples)
616         {
617                 /*
618                  * No one has split or inserted anything since start of scan, so
619                  * believe our count as gospel.
620                  */
621                 metap->hashm_ntuples = num_index_tuples;
622         }
623         else
624         {
625                 /*
626                  * Otherwise, our count is untrustworthy since we may have
627                  * double-scanned tuples in split buckets.  Proceed by dead-reckoning.
628                  * (Note: we still return estimated_count = false, because using this
629                  * count is better than not updating reltuples at all.)
630                  */
631                 if (metap->hashm_ntuples > tuples_removed)
632                         metap->hashm_ntuples -= tuples_removed;
633                 else
634                         metap->hashm_ntuples = 0;
635                 num_index_tuples = metap->hashm_ntuples;
636         }
637
638         MarkBufferDirty(metabuf);
639         _hash_relbuf(rel, metabuf);
640
641         /* return statistics */
642         if (stats == NULL)
643                 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
644         stats->estimated_count = false;
645         stats->num_index_tuples = num_index_tuples;
646         stats->tuples_removed += tuples_removed;
647         /* hashvacuumcleanup will fill in num_pages */
648
649         return stats;
650 }
651
652 /*
653  * Post-VACUUM cleanup.
654  *
655  * Result: a palloc'd struct containing statistical info for VACUUM displays.
656  */
657 IndexBulkDeleteResult *
658 hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
659 {
660         Relation        rel = info->index;
661         BlockNumber num_pages;
662
663         /* If hashbulkdelete wasn't called, return NULL signifying no change */
664         /* Note: this covers the analyze_only case too */
665         if (stats == NULL)
666                 return NULL;
667
668         /* update statistics */
669         num_pages = RelationGetNumberOfBlocks(rel);
670         stats->num_pages = num_pages;
671
672         return stats;
673 }
674
675 /*
676  * Helper function to perform deletion of index entries from a bucket.
677  *
678  * This function expects that the caller has acquired a cleanup lock on the
679  * primary bucket page, and will return with a write lock again held on the
680  * primary bucket page.  The lock won't necessarily be held continuously,
681  * though, because we'll release it when visiting overflow pages.
682  *
683  * It would be very bad if this function cleaned a page while some other
684  * backend was in the midst of scanning it, because hashgettuple assumes
685  * that the next valid TID will be greater than or equal to the current
686  * valid TID.  There can't be any concurrent scans in progress when we first
687  * enter this function because of the cleanup lock we hold on the primary
688  * bucket page, but as soon as we release that lock, there might be.  We
689  * handle that by conspiring to prevent those scans from passing our cleanup
690  * scan.  To do that, we lock the next page in the bucket chain before
691  * releasing the lock on the previous page.  (This type of lock chaining is
692  * not ideal, so we might want to look for a better solution at some point.)
693  *
694  * We need to retain a pin on the primary bucket to ensure that no concurrent
695  * split can start.
696  */
697 void
698 hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
699                                   BlockNumber bucket_blkno, BufferAccessStrategy bstrategy,
700                                   uint32 maxbucket, uint32 highmask, uint32 lowmask,
701                                   double *tuples_removed, double *num_index_tuples,
702                                   bool split_cleanup,
703                                   IndexBulkDeleteCallback callback, void *callback_state)
704 {
705         BlockNumber blkno;
706         Buffer          buf;
707         Bucket new_bucket PG_USED_FOR_ASSERTS_ONLY = InvalidBucket;
708         bool            bucket_dirty = false;
709
710         blkno = bucket_blkno;
711         buf = bucket_buf;
712
713         if (split_cleanup)
714                 new_bucket = _hash_get_newbucket_from_oldbucket(rel, cur_bucket,
715                                                                                                                 lowmask, maxbucket);
716
717         /* Scan each page in bucket */
718         for (;;)
719         {
720                 HashPageOpaque opaque;
721                 OffsetNumber offno;
722                 OffsetNumber maxoffno;
723                 Buffer          next_buf;
724                 Page            page;
725                 OffsetNumber deletable[MaxOffsetNumber];
726                 int                     ndeletable = 0;
727                 bool            retain_pin = false;
728
729                 vacuum_delay_point();
730
731                 page = BufferGetPage(buf);
732                 opaque = (HashPageOpaque) PageGetSpecialPointer(page);
733
734                 /* Scan each tuple in page */
735                 maxoffno = PageGetMaxOffsetNumber(page);
736                 for (offno = FirstOffsetNumber;
737                          offno <= maxoffno;
738                          offno = OffsetNumberNext(offno))
739                 {
740                         ItemPointer htup;
741                         IndexTuple      itup;
742                         Bucket          bucket;
743                         bool            kill_tuple = false;
744
745                         itup = (IndexTuple) PageGetItem(page,
746                                                                                         PageGetItemId(page, offno));
747                         htup = &(itup->t_tid);
748
749                         /*
750                          * To remove the dead tuples, we strictly want to rely on results
751                          * of callback function.  refer btvacuumpage for detailed reason.
752                          */
753                         if (callback && callback(htup, callback_state))
754                         {
755                                 kill_tuple = true;
756                                 if (tuples_removed)
757                                         *tuples_removed += 1;
758                         }
759                         else if (split_cleanup)
760                         {
761                                 /* delete the tuples that are moved by split. */
762                                 bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
763                                                                                           maxbucket,
764                                                                                           highmask,
765                                                                                           lowmask);
766                                 /* mark the item for deletion */
767                                 if (bucket != cur_bucket)
768                                 {
769                                         /*
770                                          * We expect tuples to either belong to curent bucket or
771                                          * new_bucket.  This is ensured because we don't allow
772                                          * further splits from bucket that contains garbage. See
773                                          * comments in _hash_expandtable.
774                                          */
775                                         Assert(bucket == new_bucket);
776                                         kill_tuple = true;
777                                 }
778                         }
779
780                         if (kill_tuple)
781                         {
782                                 /* mark the item for deletion */
783                                 deletable[ndeletable++] = offno;
784                         }
785                         else
786                         {
787                                 /* we're keeping it, so count it */
788                                 if (num_index_tuples)
789                                         *num_index_tuples += 1;
790                         }
791                 }
792
793                 /* retain the pin on primary bucket page till end of bucket scan */
794                 if (blkno == bucket_blkno)
795                         retain_pin = true;
796                 else
797                         retain_pin = false;
798
799                 blkno = opaque->hasho_nextblkno;
800
801                 /*
802                  * Apply deletions, advance to next page and write page if needed.
803                  */
804                 if (ndeletable > 0)
805                 {
806                         PageIndexMultiDelete(page, deletable, ndeletable);
807                         bucket_dirty = true;
808                         MarkBufferDirty(buf);
809                 }
810
811                 /* bail out if there are no more pages to scan. */
812                 if (!BlockNumberIsValid(blkno))
813                         break;
814
815                 next_buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
816                                                                                           LH_OVERFLOW_PAGE,
817                                                                                           bstrategy);
818
819                 /*
820                  * release the lock on previous page after acquiring the lock on next
821                  * page
822                  */
823                 if (retain_pin)
824                         _hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
825                 else
826                         _hash_relbuf(rel, buf);
827
828                 buf = next_buf;
829         }
830
831         /*
832          * lock the bucket page to clear the garbage flag and squeeze the bucket.
833          * if the current buffer is same as bucket buffer, then we already have
834          * lock on bucket page.
835          */
836         if (buf != bucket_buf)
837         {
838                 _hash_relbuf(rel, buf);
839                 _hash_chgbufaccess(rel, bucket_buf, HASH_NOLOCK, HASH_WRITE);
840         }
841
842         /*
843          * Clear the garbage flag from bucket after deleting the tuples that are
844          * moved by split.  We purposefully clear the flag before squeeze bucket,
845          * so that after restart, vacuum shouldn't again try to delete the moved
846          * by split tuples.
847          */
848         if (split_cleanup)
849         {
850                 HashPageOpaque bucket_opaque;
851                 Page            page;
852
853                 page = BufferGetPage(bucket_buf);
854                 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
855
856                 bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
857                 MarkBufferDirty(bucket_buf);
858         }
859
860         /*
861          * If we have deleted anything, try to compact free space.  For squeezing
862          * the bucket, we must have a cleanup lock, else it can impact the
863          * ordering of tuples for a scan that has started before it.
864          */
865         if (bucket_dirty && IsBufferCleanupOK(bucket_buf))
866                 _hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
867                                                         bstrategy);
868         else
869                 _hash_chgbufaccess(rel, bucket_buf, HASH_READ, HASH_NOLOCK);
870 }
871
872 void
873 hash_redo(XLogReaderState *record)
874 {
875         elog(PANIC, "hash_redo: unimplemented");
876 }