]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hashovfl.c
Remove _hash_wrtbuf() in favor of calling MarkBufferDirty().
[postgresql] / src / backend / access / hash / hashovfl.c
1 /*-------------------------------------------------------------------------
2  *
3  * hashovfl.c
4  *        Overflow page management code for the Postgres hash access method
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/hash/hashovfl.c
12  *
13  * NOTES
14  *        Overflow pages look like ordinary relation pages.
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/hash.h"
21 #include "utils/rel.h"
22
23
24 static Buffer _hash_getovflpage(Relation rel, Buffer metabuf);
25 static uint32 _hash_firstfreebit(uint32 map);
26
27
28 /*
29  * Convert overflow page bit number (its index in the free-page bitmaps)
30  * to block number within the index.
31  */
32 static BlockNumber
33 bitno_to_blkno(HashMetaPage metap, uint32 ovflbitnum)
34 {
35         uint32          splitnum = metap->hashm_ovflpoint;
36         uint32          i;
37
38         /* Convert zero-based bitnumber to 1-based page number */
39         ovflbitnum += 1;
40
41         /* Determine the split number for this page (must be >= 1) */
42         for (i = 1;
43                  i < splitnum && ovflbitnum > metap->hashm_spares[i];
44                  i++)
45                  /* loop */ ;
46
47         /*
48          * Convert to absolute page number by adding the number of bucket pages
49          * that exist before this split point.
50          */
51         return (BlockNumber) ((1 << i) + ovflbitnum);
52 }
53
54 /*
55  * Convert overflow page block number to bit number for free-page bitmap.
56  */
57 static uint32
58 blkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno)
59 {
60         uint32          splitnum = metap->hashm_ovflpoint;
61         uint32          i;
62         uint32          bitnum;
63
64         /* Determine the split number containing this page */
65         for (i = 1; i <= splitnum; i++)
66         {
67                 if (ovflblkno <= (BlockNumber) (1 << i))
68                         break;                          /* oops */
69                 bitnum = ovflblkno - (1 << i);
70                 if (bitnum <= metap->hashm_spares[i])
71                         return bitnum - 1;      /* -1 to convert 1-based to 0-based */
72         }
73
74         elog(ERROR, "invalid overflow block number %u", ovflblkno);
75         return 0;                                       /* keep compiler quiet */
76 }
77
78 /*
79  *      _hash_addovflpage
80  *
81  *      Add an overflow page to the bucket whose last page is pointed to by 'buf'.
82  *
83  *      On entry, the caller must hold a pin but no lock on 'buf'.  The pin is
84  *      dropped before exiting (we assume the caller is not interested in 'buf'
85  *      anymore) if not asked to retain.  The pin will be retained only for the
86  *      primary bucket.  The returned overflow page will be pinned and
87  *      write-locked; it is guaranteed to be empty.
88  *
89  *      The caller must hold a pin, but no lock, on the metapage buffer.
90  *      That buffer is returned in the same state.
91  *
92  * NB: since this could be executed concurrently by multiple processes,
93  * one should not assume that the returned overflow page will be the
94  * immediate successor of the originally passed 'buf'.  Additional overflow
95  * pages might have been added to the bucket chain in between.
96  */
97 Buffer
98 _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin)
99 {
100         Buffer          ovflbuf;
101         Page            page;
102         Page            ovflpage;
103         HashPageOpaque pageopaque;
104         HashPageOpaque ovflopaque;
105
106         /* allocate and lock an empty overflow page */
107         ovflbuf = _hash_getovflpage(rel, metabuf);
108
109         /*
110          * Write-lock the tail page.  It is okay to hold two buffer locks here
111          * since there cannot be anyone else contending for access to ovflbuf.
112          */
113         _hash_chgbufaccess(rel, buf, HASH_NOLOCK, HASH_WRITE);
114
115         /* probably redundant... */
116         _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
117
118         /* loop to find current tail page, in case someone else inserted too */
119         for (;;)
120         {
121                 BlockNumber nextblkno;
122
123                 page = BufferGetPage(buf);
124                 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
125                 nextblkno = pageopaque->hasho_nextblkno;
126
127                 if (!BlockNumberIsValid(nextblkno))
128                         break;
129
130                 /* we assume we do not need to write the unmodified page */
131                 if ((pageopaque->hasho_flag & LH_BUCKET_PAGE) && retain_pin)
132                         _hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
133                 else
134                         _hash_relbuf(rel, buf);
135
136                 buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
137         }
138
139         /* now that we have correct backlink, initialize new overflow page */
140         ovflpage = BufferGetPage(ovflbuf);
141         ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
142         ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf);
143         ovflopaque->hasho_nextblkno = InvalidBlockNumber;
144         ovflopaque->hasho_bucket = pageopaque->hasho_bucket;
145         ovflopaque->hasho_flag = LH_OVERFLOW_PAGE;
146         ovflopaque->hasho_page_id = HASHO_PAGE_ID;
147
148         MarkBufferDirty(ovflbuf);
149
150         /* logically chain overflow page to previous page */
151         pageopaque->hasho_nextblkno = BufferGetBlockNumber(ovflbuf);
152         MarkBufferDirty(buf);
153         if ((pageopaque->hasho_flag & LH_BUCKET_PAGE) && retain_pin)
154                 _hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
155         else
156                 _hash_relbuf(rel, buf);
157
158         return ovflbuf;
159 }
160
161 /*
162  *      _hash_getovflpage()
163  *
164  *      Find an available overflow page and return it.  The returned buffer
165  *      is pinned and write-locked, and has had _hash_pageinit() applied,
166  *      but it is caller's responsibility to fill the special space.
167  *
168  * The caller must hold a pin, but no lock, on the metapage buffer.
169  * That buffer is left in the same state at exit.
170  */
171 static Buffer
172 _hash_getovflpage(Relation rel, Buffer metabuf)
173 {
174         HashMetaPage metap;
175         Buffer          mapbuf = 0;
176         Buffer          newbuf;
177         BlockNumber blkno;
178         uint32          orig_firstfree;
179         uint32          splitnum;
180         uint32     *freep = NULL;
181         uint32          max_ovflpg;
182         uint32          bit;
183         uint32          first_page;
184         uint32          last_bit;
185         uint32          last_page;
186         uint32          i,
187                                 j;
188
189         /* Get exclusive lock on the meta page */
190         _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
191
192         _hash_checkpage(rel, metabuf, LH_META_PAGE);
193         metap = HashPageGetMeta(BufferGetPage(metabuf));
194
195         /* start search at hashm_firstfree */
196         orig_firstfree = metap->hashm_firstfree;
197         first_page = orig_firstfree >> BMPG_SHIFT(metap);
198         bit = orig_firstfree & BMPG_MASK(metap);
199         i = first_page;
200         j = bit / BITS_PER_MAP;
201         bit &= ~(BITS_PER_MAP - 1);
202
203         /* outer loop iterates once per bitmap page */
204         for (;;)
205         {
206                 BlockNumber mapblkno;
207                 Page            mappage;
208                 uint32          last_inpage;
209
210                 /* want to end search with the last existing overflow page */
211                 splitnum = metap->hashm_ovflpoint;
212                 max_ovflpg = metap->hashm_spares[splitnum] - 1;
213                 last_page = max_ovflpg >> BMPG_SHIFT(metap);
214                 last_bit = max_ovflpg & BMPG_MASK(metap);
215
216                 if (i > last_page)
217                         break;
218
219                 Assert(i < metap->hashm_nmaps);
220                 mapblkno = metap->hashm_mapp[i];
221
222                 if (i == last_page)
223                         last_inpage = last_bit;
224                 else
225                         last_inpage = BMPGSZ_BIT(metap) - 1;
226
227                 /* Release exclusive lock on metapage while reading bitmap page */
228                 _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
229
230                 mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE, LH_BITMAP_PAGE);
231                 mappage = BufferGetPage(mapbuf);
232                 freep = HashPageGetBitmap(mappage);
233
234                 for (; bit <= last_inpage; j++, bit += BITS_PER_MAP)
235                 {
236                         if (freep[j] != ALL_SET)
237                                 goto found;
238                 }
239
240                 /* No free space here, try to advance to next map page */
241                 _hash_relbuf(rel, mapbuf);
242                 i++;
243                 j = 0;                                  /* scan from start of next map page */
244                 bit = 0;
245
246                 /* Reacquire exclusive lock on the meta page */
247                 _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
248         }
249
250         /*
251          * No free pages --- have to extend the relation to add an overflow page.
252          * First, check to see if we have to add a new bitmap page too.
253          */
254         if (last_bit == (uint32) (BMPGSZ_BIT(metap) - 1))
255         {
256                 /*
257                  * We create the new bitmap page with all pages marked "in use".
258                  * Actually two pages in the new bitmap's range will exist
259                  * immediately: the bitmap page itself, and the following page which
260                  * is the one we return to the caller.  Both of these are correctly
261                  * marked "in use".  Subsequent pages do not exist yet, but it is
262                  * convenient to pre-mark them as "in use" too.
263                  */
264                 bit = metap->hashm_spares[splitnum];
265                 _hash_initbitmap(rel, metap, bitno_to_blkno(metap, bit), MAIN_FORKNUM);
266                 metap->hashm_spares[splitnum]++;
267         }
268         else
269         {
270                 /*
271                  * Nothing to do here; since the page will be past the last used page,
272                  * we know its bitmap bit was preinitialized to "in use".
273                  */
274         }
275
276         /* Calculate address of the new overflow page */
277         bit = metap->hashm_spares[splitnum];
278         blkno = bitno_to_blkno(metap, bit);
279
280         /*
281          * Fetch the page with _hash_getnewbuf to ensure smgr's idea of the
282          * relation length stays in sync with ours.  XXX It's annoying to do this
283          * with metapage write lock held; would be better to use a lock that
284          * doesn't block incoming searches.
285          */
286         newbuf = _hash_getnewbuf(rel, blkno, MAIN_FORKNUM);
287
288         metap->hashm_spares[splitnum]++;
289
290         /*
291          * Adjust hashm_firstfree to avoid redundant searches.  But don't risk
292          * changing it if someone moved it while we were searching bitmap pages.
293          */
294         if (metap->hashm_firstfree == orig_firstfree)
295                 metap->hashm_firstfree = bit + 1;
296
297         /* Write updated metapage and release lock, but not pin */
298         _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
299
300         return newbuf;
301
302 found:
303         /* convert bit to bit number within page */
304         bit += _hash_firstfreebit(freep[j]);
305
306         /* mark page "in use" in the bitmap */
307         SETBIT(freep, bit);
308         MarkBufferDirty(mapbuf);
309         _hash_relbuf(rel, mapbuf);
310
311         /* Reacquire exclusive lock on the meta page */
312         _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
313
314         /* convert bit to absolute bit number */
315         bit += (i << BMPG_SHIFT(metap));
316
317         /* Calculate address of the recycled overflow page */
318         blkno = bitno_to_blkno(metap, bit);
319
320         /*
321          * Adjust hashm_firstfree to avoid redundant searches.  But don't risk
322          * changing it if someone moved it while we were searching bitmap pages.
323          */
324         if (metap->hashm_firstfree == orig_firstfree)
325         {
326                 metap->hashm_firstfree = bit + 1;
327
328                 /* Write updated metapage and release lock, but not pin */
329                 _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
330         }
331         else
332         {
333                 /* We didn't change the metapage, so no need to write */
334                 _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
335         }
336
337         /* Fetch, init, and return the recycled page */
338         return _hash_getinitbuf(rel, blkno);
339 }
340
341 /*
342  *      _hash_firstfreebit()
343  *
344  *      Return the number of the first bit that is not set in the word 'map'.
345  */
346 static uint32
347 _hash_firstfreebit(uint32 map)
348 {
349         uint32          i,
350                                 mask;
351
352         mask = 0x1;
353         for (i = 0; i < BITS_PER_MAP; i++)
354         {
355                 if (!(mask & map))
356                         return i;
357                 mask <<= 1;
358         }
359
360         elog(ERROR, "firstfreebit found no free bit");
361
362         return 0;                                       /* keep compiler quiet */
363 }
364
365 /*
366  *      _hash_freeovflpage() -
367  *
368  *      Remove this overflow page from its bucket's chain, and mark the page as
369  *      free.  On entry, ovflbuf is write-locked; it is released before exiting.
370  *
371  *      Since this function is invoked in VACUUM, we provide an access strategy
372  *      parameter that controls fetches of the bucket pages.
373  *
374  *      Returns the block number of the page that followed the given page
375  *      in the bucket, or InvalidBlockNumber if no following page.
376  *
377  *      NB: caller must not hold lock on metapage, nor on page, that's next to
378  *      ovflbuf in the bucket chain.  We don't acquire the lock on page that's
379  *      prior to ovflbuf in chain if it is same as wbuf because the caller already
380  *      has a lock on same.  This function releases the lock on wbuf and caller
381  *      is responsible for releasing the pin on same.
382  */
383 BlockNumber
384 _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
385                                    bool wbuf_dirty, BufferAccessStrategy bstrategy)
386 {
387         HashMetaPage metap;
388         Buffer          metabuf;
389         Buffer          mapbuf;
390         Buffer          prevbuf = InvalidBuffer;
391         BlockNumber ovflblkno;
392         BlockNumber prevblkno;
393         BlockNumber blkno;
394         BlockNumber nextblkno;
395         BlockNumber writeblkno;
396         HashPageOpaque ovflopaque;
397         Page            ovflpage;
398         Page            mappage;
399         uint32     *freep;
400         uint32          ovflbitno;
401         int32           bitmappage,
402                                 bitmapbit;
403         Bucket bucket PG_USED_FOR_ASSERTS_ONLY;
404
405         /* Get information from the doomed page */
406         _hash_checkpage(rel, ovflbuf, LH_OVERFLOW_PAGE);
407         ovflblkno = BufferGetBlockNumber(ovflbuf);
408         ovflpage = BufferGetPage(ovflbuf);
409         ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
410         nextblkno = ovflopaque->hasho_nextblkno;
411         prevblkno = ovflopaque->hasho_prevblkno;
412         writeblkno = BufferGetBlockNumber(wbuf);
413         bucket = ovflopaque->hasho_bucket;
414
415         /*
416          * Zero the page for debugging's sake; then write and release it. (Note:
417          * if we failed to zero the page here, we'd have problems with the Assert
418          * in _hash_pageinit() when the page is reused.)
419          */
420         MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf));
421         MarkBufferDirty(ovflbuf);
422         _hash_relbuf(rel, ovflbuf);
423
424         /*
425          * Fix up the bucket chain.  this is a doubly-linked list, so we must fix
426          * up the bucket chain members behind and ahead of the overflow page being
427          * deleted.  Concurrency issues are avoided by using lock chaining as
428          * described atop hashbucketcleanup.
429          */
430         if (BlockNumberIsValid(prevblkno))
431         {
432                 Page            prevpage;
433                 HashPageOpaque prevopaque;
434
435                 if (prevblkno == writeblkno)
436                         prevbuf = wbuf;
437                 else
438                         prevbuf = _hash_getbuf_with_strategy(rel,
439                                                                                                  prevblkno,
440                                                                                                  HASH_WRITE,
441                                                                                    LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
442                                                                                                  bstrategy);
443
444                 prevpage = BufferGetPage(prevbuf);
445                 prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
446
447                 Assert(prevopaque->hasho_bucket == bucket);
448                 prevopaque->hasho_nextblkno = nextblkno;
449
450                 if (prevblkno != writeblkno)
451                 {
452                         MarkBufferDirty(prevbuf);
453                         _hash_relbuf(rel, prevbuf);
454                 }
455         }
456
457         /* write and unlock the write buffer */
458         if (wbuf_dirty)
459                 _hash_chgbufaccess(rel, wbuf, HASH_WRITE, HASH_NOLOCK);
460         else
461                 _hash_chgbufaccess(rel, wbuf, HASH_READ, HASH_NOLOCK);
462
463         if (BlockNumberIsValid(nextblkno))
464         {
465                 Buffer          nextbuf = _hash_getbuf_with_strategy(rel,
466                                                                                                                  nextblkno,
467                                                                                                                  HASH_WRITE,
468                                                                                                                  LH_OVERFLOW_PAGE,
469                                                                                                                  bstrategy);
470                 Page            nextpage = BufferGetPage(nextbuf);
471                 HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
472
473                 Assert(nextopaque->hasho_bucket == bucket);
474                 nextopaque->hasho_prevblkno = prevblkno;
475                 MarkBufferDirty(nextbuf);
476                 _hash_relbuf(rel, nextbuf);
477         }
478
479         /* Note: bstrategy is intentionally not used for metapage and bitmap */
480
481         /* Read the metapage so we can determine which bitmap page to use */
482         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
483         metap = HashPageGetMeta(BufferGetPage(metabuf));
484
485         /* Identify which bit to set */
486         ovflbitno = blkno_to_bitno(metap, ovflblkno);
487
488         bitmappage = ovflbitno >> BMPG_SHIFT(metap);
489         bitmapbit = ovflbitno & BMPG_MASK(metap);
490
491         if (bitmappage >= metap->hashm_nmaps)
492                 elog(ERROR, "invalid overflow bit number %u", ovflbitno);
493         blkno = metap->hashm_mapp[bitmappage];
494
495         /* Release metapage lock while we access the bitmap page */
496         _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
497
498         /* Clear the bitmap bit to indicate that this overflow page is free */
499         mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BITMAP_PAGE);
500         mappage = BufferGetPage(mapbuf);
501         freep = HashPageGetBitmap(mappage);
502         Assert(ISSET(freep, bitmapbit));
503         CLRBIT(freep, bitmapbit);
504         MarkBufferDirty(mapbuf);
505         _hash_relbuf(rel, mapbuf);
506
507         /* Get write-lock on metapage to update firstfree */
508         _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
509
510         /* if this is now the first free page, update hashm_firstfree */
511         if (ovflbitno < metap->hashm_firstfree)
512         {
513                 metap->hashm_firstfree = ovflbitno;
514                 MarkBufferDirty(metabuf);
515         }
516         _hash_relbuf(rel, metabuf);
517
518         return nextblkno;
519 }
520
521
522 /*
523  *      _hash_initbitmap()
524  *
525  *       Initialize a new bitmap page.  The metapage has a write-lock upon
526  *       entering the function, and must be written by caller after return.
527  *
528  * 'blkno' is the block number of the new bitmap page.
529  *
530  * All bits in the new bitmap page are set to "1", indicating "in use".
531  */
532 void
533 _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno,
534                                  ForkNumber forkNum)
535 {
536         Buffer          buf;
537         Page            pg;
538         HashPageOpaque op;
539         uint32     *freep;
540
541         /*
542          * It is okay to write-lock the new bitmap page while holding metapage
543          * write lock, because no one else could be contending for the new page.
544          * Also, the metapage lock makes it safe to extend the index using
545          * _hash_getnewbuf.
546          *
547          * There is some loss of concurrency in possibly doing I/O for the new
548          * page while holding the metapage lock, but this path is taken so seldom
549          * that it's not worth worrying about.
550          */
551         buf = _hash_getnewbuf(rel, blkno, forkNum);
552         pg = BufferGetPage(buf);
553
554         /* initialize the page's special space */
555         op = (HashPageOpaque) PageGetSpecialPointer(pg);
556         op->hasho_prevblkno = InvalidBlockNumber;
557         op->hasho_nextblkno = InvalidBlockNumber;
558         op->hasho_bucket = -1;
559         op->hasho_flag = LH_BITMAP_PAGE;
560         op->hasho_page_id = HASHO_PAGE_ID;
561
562         /* set all of the bits to 1 */
563         freep = HashPageGetBitmap(pg);
564         MemSet(freep, 0xFF, BMPGSZ_BYTE(metap));
565
566         /* dirty the new bitmap page, and release write lock and pin */
567         MarkBufferDirty(buf);
568         _hash_relbuf(rel, buf);
569
570         /* add the new bitmap page to the metapage's list of bitmaps */
571         /* metapage already has a write lock */
572         if (metap->hashm_nmaps >= HASH_MAX_BITMAPS)
573                 ereport(ERROR,
574                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
575                                  errmsg("out of overflow pages in hash index \"%s\"",
576                                                 RelationGetRelationName(rel))));
577
578         metap->hashm_mapp[metap->hashm_nmaps] = blkno;
579
580         metap->hashm_nmaps++;
581 }
582
583
584 /*
585  *      _hash_squeezebucket(rel, bucket)
586  *
587  *      Try to squeeze the tuples onto pages occurring earlier in the
588  *      bucket chain in an attempt to free overflow pages. When we start
589  *      the "squeezing", the page from which we start taking tuples (the
590  *      "read" page) is the last bucket in the bucket chain and the page
591  *      onto which we start squeezing tuples (the "write" page) is the
592  *      first page in the bucket chain.  The read page works backward and
593  *      the write page works forward; the procedure terminates when the
594  *      read page and write page are the same page.
595  *
596  *      At completion of this procedure, it is guaranteed that all pages in
597  *      the bucket are nonempty, unless the bucket is totally empty (in
598  *      which case all overflow pages will be freed).  The original implementation
599  *      required that to be true on entry as well, but it's a lot easier for
600  *      callers to leave empty overflow pages and let this guy clean it up.
601  *
602  *      Caller must acquire cleanup lock on the primary page of the target
603  *      bucket to exclude any scans that are in progress, which could easily
604  *      be confused into returning the same tuple more than once or some tuples
605  *      not at all by the rearrangement we are performing here.  To prevent
606  *      any concurrent scan to cross the squeeze scan we use lock chaining
607  *      similar to hasbucketcleanup.  Refer comments atop hashbucketcleanup.
608  *
609  *      We need to retain a pin on the primary bucket to ensure that no concurrent
610  *      split can start.
611  *
612  *      Since this function is invoked in VACUUM, we provide an access strategy
613  *      parameter that controls fetches of the bucket pages.
614  */
615 void
616 _hash_squeezebucket(Relation rel,
617                                         Bucket bucket,
618                                         BlockNumber bucket_blkno,
619                                         Buffer bucket_buf,
620                                         BufferAccessStrategy bstrategy)
621 {
622         BlockNumber wblkno;
623         BlockNumber rblkno;
624         Buffer          wbuf;
625         Buffer          rbuf;
626         Page            wpage;
627         Page            rpage;
628         HashPageOpaque wopaque;
629         HashPageOpaque ropaque;
630         bool            wbuf_dirty;
631
632         /*
633          * start squeezing into the primary bucket page.
634          */
635         wblkno = bucket_blkno;
636         wbuf = bucket_buf;
637         wpage = BufferGetPage(wbuf);
638         wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
639
640         /*
641          * if there aren't any overflow pages, there's nothing to squeeze. caller
642          * is responsible for releasing the pin on primary bucket page.
643          */
644         if (!BlockNumberIsValid(wopaque->hasho_nextblkno))
645         {
646                 _hash_chgbufaccess(rel, wbuf, HASH_WRITE, HASH_NOLOCK);
647                 return;
648         }
649
650         /*
651          * Find the last page in the bucket chain by starting at the base bucket
652          * page and working forward.  Note: we assume that a hash bucket chain is
653          * usually smaller than the buffer ring being used by VACUUM, else using
654          * the access strategy here would be counterproductive.
655          */
656         rbuf = InvalidBuffer;
657         ropaque = wopaque;
658         do
659         {
660                 rblkno = ropaque->hasho_nextblkno;
661                 if (rbuf != InvalidBuffer)
662                         _hash_relbuf(rel, rbuf);
663                 rbuf = _hash_getbuf_with_strategy(rel,
664                                                                                   rblkno,
665                                                                                   HASH_WRITE,
666                                                                                   LH_OVERFLOW_PAGE,
667                                                                                   bstrategy);
668                 rpage = BufferGetPage(rbuf);
669                 ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
670                 Assert(ropaque->hasho_bucket == bucket);
671         } while (BlockNumberIsValid(ropaque->hasho_nextblkno));
672
673         /*
674          * squeeze the tuples.
675          */
676         wbuf_dirty = false;
677         for (;;)
678         {
679                 OffsetNumber roffnum;
680                 OffsetNumber maxroffnum;
681                 OffsetNumber deletable[MaxOffsetNumber];
682                 int                     ndeletable = 0;
683                 bool            retain_pin = false;
684
685                 /* Scan each tuple in "read" page */
686                 maxroffnum = PageGetMaxOffsetNumber(rpage);
687                 for (roffnum = FirstOffsetNumber;
688                          roffnum <= maxroffnum;
689                          roffnum = OffsetNumberNext(roffnum))
690                 {
691                         IndexTuple      itup;
692                         Size            itemsz;
693
694                         /* skip dead tuples */
695                         if (ItemIdIsDead(PageGetItemId(rpage, roffnum)))
696                                 continue;
697
698                         itup = (IndexTuple) PageGetItem(rpage,
699                                                                                         PageGetItemId(rpage, roffnum));
700                         itemsz = IndexTupleDSize(*itup);
701                         itemsz = MAXALIGN(itemsz);
702
703                         /*
704                          * Walk up the bucket chain, looking for a page big enough for
705                          * this item.  Exit if we reach the read page.
706                          */
707                         while (PageGetFreeSpace(wpage) < itemsz)
708                         {
709                                 Buffer          next_wbuf = InvalidBuffer;
710
711                                 Assert(!PageIsEmpty(wpage));
712
713                                 if (wblkno == bucket_blkno)
714                                         retain_pin = true;
715
716                                 wblkno = wopaque->hasho_nextblkno;
717                                 Assert(BlockNumberIsValid(wblkno));
718
719                                 /* don't need to move to next page if we reached the read page */
720                                 if (wblkno != rblkno)
721                                         next_wbuf = _hash_getbuf_with_strategy(rel,
722                                                                                                                    wblkno,
723                                                                                                                    HASH_WRITE,
724                                                                                                                    LH_OVERFLOW_PAGE,
725                                                                                                                    bstrategy);
726
727                                 /*
728                                  * release the lock on previous page after acquiring the lock
729                                  * on next page
730                                  */
731                                 if (wbuf_dirty)
732                                         MarkBufferDirty(wbuf);
733                                 if (retain_pin)
734                                         _hash_chgbufaccess(rel, wbuf, HASH_READ, HASH_NOLOCK);
735                                 else
736                                         _hash_relbuf(rel, wbuf);
737
738                                 /* nothing more to do if we reached the read page */
739                                 if (rblkno == wblkno)
740                                 {
741                                         if (ndeletable > 0)
742                                         {
743                                                 /* Delete tuples we already moved off read page */
744                                                 PageIndexMultiDelete(rpage, deletable, ndeletable);
745                                                 MarkBufferDirty(rbuf);
746                                         }
747                                         _hash_relbuf(rel, rbuf);
748                                         return;
749                                 }
750
751                                 wbuf = next_wbuf;
752                                 wpage = BufferGetPage(wbuf);
753                                 wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
754                                 Assert(wopaque->hasho_bucket == bucket);
755                                 wbuf_dirty = false;
756                                 retain_pin = false;
757                         }
758
759                         /*
760                          * we have found room so insert on the "write" page, being careful
761                          * to preserve hashkey ordering.  (If we insert many tuples into
762                          * the same "write" page it would be worth qsort'ing instead of
763                          * doing repeated _hash_pgaddtup.)
764                          */
765                         (void) _hash_pgaddtup(rel, wbuf, itemsz, itup);
766                         wbuf_dirty = true;
767
768                         /* remember tuple for deletion from "read" page */
769                         deletable[ndeletable++] = roffnum;
770                 }
771
772                 /*
773                  * If we reach here, there are no live tuples on the "read" page ---
774                  * it was empty when we got to it, or we moved them all.  So we can
775                  * just free the page without bothering with deleting tuples
776                  * individually.  Then advance to the previous "read" page.
777                  *
778                  * Tricky point here: if our read and write pages are adjacent in the
779                  * bucket chain, our write lock on wbuf will conflict with
780                  * _hash_freeovflpage's attempt to update the sibling links of the
781                  * removed page.  In that case, we don't need to lock it again and we
782                  * always release the lock on wbuf in _hash_freeovflpage and then
783                  * retake it again here.  This will not only simplify the code, but is
784                  * required to atomically log the changes which will be helpful when
785                  * we write WAL for hash indexes.
786                  */
787                 rblkno = ropaque->hasho_prevblkno;
788                 Assert(BlockNumberIsValid(rblkno));
789
790                 /* free this overflow page (releases rbuf) */
791                 _hash_freeovflpage(rel, rbuf, wbuf, wbuf_dirty, bstrategy);
792
793                 /* are we freeing the page adjacent to wbuf? */
794                 if (rblkno == wblkno)
795                 {
796                         /* retain the pin on primary bucket page till end of bucket scan */
797                         if (wblkno != bucket_blkno)
798                                 _hash_dropbuf(rel, wbuf);
799                         return;
800                 }
801
802                 /* lock the overflow page being written, then get the previous one */
803                 _hash_chgbufaccess(rel, wbuf, HASH_NOLOCK, HASH_WRITE);
804
805                 rbuf = _hash_getbuf_with_strategy(rel,
806                                                                                   rblkno,
807                                                                                   HASH_WRITE,
808                                                                                   LH_OVERFLOW_PAGE,
809                                                                                   bstrategy);
810                 rpage = BufferGetPage(rbuf);
811                 ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
812                 Assert(ropaque->hasho_bucket == bucket);
813         }
814
815         /* NOTREACHED */
816 }