]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hashovfl.c
Update copyright for 2014
[postgresql] / src / backend / access / hash / hashovfl.c
1 /*-------------------------------------------------------------------------
2  *
3  * hashovfl.c
4  *        Overflow page management code for the Postgres hash access method
5  *
6  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/hash/hashovfl.c
12  *
13  * NOTES
14  *        Overflow pages look like ordinary relation pages.
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/hash.h"
21 #include "utils/rel.h"
22
23
24 static Buffer _hash_getovflpage(Relation rel, Buffer metabuf);
25 static uint32 _hash_firstfreebit(uint32 map);
26
27
28 /*
29  * Convert overflow page bit number (its index in the free-page bitmaps)
30  * to block number within the index.
31  */
32 static BlockNumber
33 bitno_to_blkno(HashMetaPage metap, uint32 ovflbitnum)
34 {
35         uint32          splitnum = metap->hashm_ovflpoint;
36         uint32          i;
37
38         /* Convert zero-based bitnumber to 1-based page number */
39         ovflbitnum += 1;
40
41         /* Determine the split number for this page (must be >= 1) */
42         for (i = 1;
43                  i < splitnum && ovflbitnum > metap->hashm_spares[i];
44                  i++)
45                  /* loop */ ;
46
47         /*
48          * Convert to absolute page number by adding the number of bucket pages
49          * that exist before this split point.
50          */
51         return (BlockNumber) ((1 << i) + ovflbitnum);
52 }
53
54 /*
55  * Convert overflow page block number to bit number for free-page bitmap.
56  */
57 static uint32
58 blkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno)
59 {
60         uint32          splitnum = metap->hashm_ovflpoint;
61         uint32          i;
62         uint32          bitnum;
63
64         /* Determine the split number containing this page */
65         for (i = 1; i <= splitnum; i++)
66         {
67                 if (ovflblkno <= (BlockNumber) (1 << i))
68                         break;                          /* oops */
69                 bitnum = ovflblkno - (1 << i);
70                 if (bitnum <= metap->hashm_spares[i])
71                         return bitnum - 1;      /* -1 to convert 1-based to 0-based */
72         }
73
74         elog(ERROR, "invalid overflow block number %u", ovflblkno);
75         return 0;                                       /* keep compiler quiet */
76 }
77
78 /*
79  *      _hash_addovflpage
80  *
81  *      Add an overflow page to the bucket whose last page is pointed to by 'buf'.
82  *
83  *      On entry, the caller must hold a pin but no lock on 'buf'.      The pin is
84  *      dropped before exiting (we assume the caller is not interested in 'buf'
85  *      anymore).  The returned overflow page will be pinned and write-locked;
86  *      it is guaranteed to be empty.
87  *
88  *      The caller must hold a pin, but no lock, on the metapage buffer.
89  *      That buffer is returned in the same state.
90  *
91  *      The caller must hold at least share lock on the bucket, to ensure that
92  *      no one else tries to compact the bucket meanwhile.      This guarantees that
93  *      'buf' won't stop being part of the bucket while it's unlocked.
94  *
95  * NB: since this could be executed concurrently by multiple processes,
96  * one should not assume that the returned overflow page will be the
97  * immediate successor of the originally passed 'buf'.  Additional overflow
98  * pages might have been added to the bucket chain in between.
99  */
100 Buffer
101 _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
102 {
103         Buffer          ovflbuf;
104         Page            page;
105         Page            ovflpage;
106         HashPageOpaque pageopaque;
107         HashPageOpaque ovflopaque;
108
109         /* allocate and lock an empty overflow page */
110         ovflbuf = _hash_getovflpage(rel, metabuf);
111
112         /*
113          * Write-lock the tail page.  It is okay to hold two buffer locks here
114          * since there cannot be anyone else contending for access to ovflbuf.
115          */
116         _hash_chgbufaccess(rel, buf, HASH_NOLOCK, HASH_WRITE);
117
118         /* probably redundant... */
119         _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
120
121         /* loop to find current tail page, in case someone else inserted too */
122         for (;;)
123         {
124                 BlockNumber nextblkno;
125
126                 page = BufferGetPage(buf);
127                 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
128                 nextblkno = pageopaque->hasho_nextblkno;
129
130                 if (!BlockNumberIsValid(nextblkno))
131                         break;
132
133                 /* we assume we do not need to write the unmodified page */
134                 _hash_relbuf(rel, buf);
135
136                 buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
137         }
138
139         /* now that we have correct backlink, initialize new overflow page */
140         ovflpage = BufferGetPage(ovflbuf);
141         ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
142         ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf);
143         ovflopaque->hasho_nextblkno = InvalidBlockNumber;
144         ovflopaque->hasho_bucket = pageopaque->hasho_bucket;
145         ovflopaque->hasho_flag = LH_OVERFLOW_PAGE;
146         ovflopaque->hasho_page_id = HASHO_PAGE_ID;
147
148         MarkBufferDirty(ovflbuf);
149
150         /* logically chain overflow page to previous page */
151         pageopaque->hasho_nextblkno = BufferGetBlockNumber(ovflbuf);
152         _hash_wrtbuf(rel, buf);
153
154         return ovflbuf;
155 }
156
157 /*
158  *      _hash_getovflpage()
159  *
160  *      Find an available overflow page and return it.  The returned buffer
161  *      is pinned and write-locked, and has had _hash_pageinit() applied,
162  *      but it is caller's responsibility to fill the special space.
163  *
164  * The caller must hold a pin, but no lock, on the metapage buffer.
165  * That buffer is left in the same state at exit.
166  */
167 static Buffer
168 _hash_getovflpage(Relation rel, Buffer metabuf)
169 {
170         HashMetaPage metap;
171         Buffer          mapbuf = 0;
172         Buffer          newbuf;
173         BlockNumber blkno;
174         uint32          orig_firstfree;
175         uint32          splitnum;
176         uint32     *freep = NULL;
177         uint32          max_ovflpg;
178         uint32          bit;
179         uint32          first_page;
180         uint32          last_bit;
181         uint32          last_page;
182         uint32          i,
183                                 j;
184
185         /* Get exclusive lock on the meta page */
186         _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
187
188         _hash_checkpage(rel, metabuf, LH_META_PAGE);
189         metap = HashPageGetMeta(BufferGetPage(metabuf));
190
191         /* start search at hashm_firstfree */
192         orig_firstfree = metap->hashm_firstfree;
193         first_page = orig_firstfree >> BMPG_SHIFT(metap);
194         bit = orig_firstfree & BMPG_MASK(metap);
195         i = first_page;
196         j = bit / BITS_PER_MAP;
197         bit &= ~(BITS_PER_MAP - 1);
198
199         /* outer loop iterates once per bitmap page */
200         for (;;)
201         {
202                 BlockNumber mapblkno;
203                 Page            mappage;
204                 uint32          last_inpage;
205
206                 /* want to end search with the last existing overflow page */
207                 splitnum = metap->hashm_ovflpoint;
208                 max_ovflpg = metap->hashm_spares[splitnum] - 1;
209                 last_page = max_ovflpg >> BMPG_SHIFT(metap);
210                 last_bit = max_ovflpg & BMPG_MASK(metap);
211
212                 if (i > last_page)
213                         break;
214
215                 Assert(i < metap->hashm_nmaps);
216                 mapblkno = metap->hashm_mapp[i];
217
218                 if (i == last_page)
219                         last_inpage = last_bit;
220                 else
221                         last_inpage = BMPGSZ_BIT(metap) - 1;
222
223                 /* Release exclusive lock on metapage while reading bitmap page */
224                 _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
225
226                 mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE, LH_BITMAP_PAGE);
227                 mappage = BufferGetPage(mapbuf);
228                 freep = HashPageGetBitmap(mappage);
229
230                 for (; bit <= last_inpage; j++, bit += BITS_PER_MAP)
231                 {
232                         if (freep[j] != ALL_SET)
233                                 goto found;
234                 }
235
236                 /* No free space here, try to advance to next map page */
237                 _hash_relbuf(rel, mapbuf);
238                 i++;
239                 j = 0;                                  /* scan from start of next map page */
240                 bit = 0;
241
242                 /* Reacquire exclusive lock on the meta page */
243                 _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
244         }
245
246         /*
247          * No free pages --- have to extend the relation to add an overflow page.
248          * First, check to see if we have to add a new bitmap page too.
249          */
250         if (last_bit == (uint32) (BMPGSZ_BIT(metap) - 1))
251         {
252                 /*
253                  * We create the new bitmap page with all pages marked "in use".
254                  * Actually two pages in the new bitmap's range will exist
255                  * immediately: the bitmap page itself, and the following page which
256                  * is the one we return to the caller.  Both of these are correctly
257                  * marked "in use".  Subsequent pages do not exist yet, but it is
258                  * convenient to pre-mark them as "in use" too.
259                  */
260                 bit = metap->hashm_spares[splitnum];
261                 _hash_initbitmap(rel, metap, bitno_to_blkno(metap, bit), MAIN_FORKNUM);
262                 metap->hashm_spares[splitnum]++;
263         }
264         else
265         {
266                 /*
267                  * Nothing to do here; since the page will be past the last used page,
268                  * we know its bitmap bit was preinitialized to "in use".
269                  */
270         }
271
272         /* Calculate address of the new overflow page */
273         bit = metap->hashm_spares[splitnum];
274         blkno = bitno_to_blkno(metap, bit);
275
276         /*
277          * Fetch the page with _hash_getnewbuf to ensure smgr's idea of the
278          * relation length stays in sync with ours.  XXX It's annoying to do this
279          * with metapage write lock held; would be better to use a lock that
280          * doesn't block incoming searches.
281          */
282         newbuf = _hash_getnewbuf(rel, blkno, MAIN_FORKNUM);
283
284         metap->hashm_spares[splitnum]++;
285
286         /*
287          * Adjust hashm_firstfree to avoid redundant searches.  But don't risk
288          * changing it if someone moved it while we were searching bitmap pages.
289          */
290         if (metap->hashm_firstfree == orig_firstfree)
291                 metap->hashm_firstfree = bit + 1;
292
293         /* Write updated metapage and release lock, but not pin */
294         _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
295
296         return newbuf;
297
298 found:
299         /* convert bit to bit number within page */
300         bit += _hash_firstfreebit(freep[j]);
301
302         /* mark page "in use" in the bitmap */
303         SETBIT(freep, bit);
304         _hash_wrtbuf(rel, mapbuf);
305
306         /* Reacquire exclusive lock on the meta page */
307         _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
308
309         /* convert bit to absolute bit number */
310         bit += (i << BMPG_SHIFT(metap));
311
312         /* Calculate address of the recycled overflow page */
313         blkno = bitno_to_blkno(metap, bit);
314
315         /*
316          * Adjust hashm_firstfree to avoid redundant searches.  But don't risk
317          * changing it if someone moved it while we were searching bitmap pages.
318          */
319         if (metap->hashm_firstfree == orig_firstfree)
320         {
321                 metap->hashm_firstfree = bit + 1;
322
323                 /* Write updated metapage and release lock, but not pin */
324                 _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
325         }
326         else
327         {
328                 /* We didn't change the metapage, so no need to write */
329                 _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
330         }
331
332         /* Fetch, init, and return the recycled page */
333         return _hash_getinitbuf(rel, blkno);
334 }
335
336 /*
337  *      _hash_firstfreebit()
338  *
339  *      Return the number of the first bit that is not set in the word 'map'.
340  */
341 static uint32
342 _hash_firstfreebit(uint32 map)
343 {
344         uint32          i,
345                                 mask;
346
347         mask = 0x1;
348         for (i = 0; i < BITS_PER_MAP; i++)
349         {
350                 if (!(mask & map))
351                         return i;
352                 mask <<= 1;
353         }
354
355         elog(ERROR, "firstfreebit found no free bit");
356
357         return 0;                                       /* keep compiler quiet */
358 }
359
360 /*
361  *      _hash_freeovflpage() -
362  *
363  *      Remove this overflow page from its bucket's chain, and mark the page as
364  *      free.  On entry, ovflbuf is write-locked; it is released before exiting.
365  *
366  *      Since this function is invoked in VACUUM, we provide an access strategy
367  *      parameter that controls fetches of the bucket pages.
368  *
369  *      Returns the block number of the page that followed the given page
370  *      in the bucket, or InvalidBlockNumber if no following page.
371  *
372  *      NB: caller must not hold lock on metapage, nor on either page that's
373  *      adjacent in the bucket chain.  The caller had better hold exclusive lock
374  *      on the bucket, too.
375  */
376 BlockNumber
377 _hash_freeovflpage(Relation rel, Buffer ovflbuf,
378                                    BufferAccessStrategy bstrategy)
379 {
380         HashMetaPage metap;
381         Buffer          metabuf;
382         Buffer          mapbuf;
383         BlockNumber ovflblkno;
384         BlockNumber prevblkno;
385         BlockNumber blkno;
386         BlockNumber nextblkno;
387         HashPageOpaque ovflopaque;
388         Page            ovflpage;
389         Page            mappage;
390         uint32     *freep;
391         uint32          ovflbitno;
392         int32           bitmappage,
393                                 bitmapbit;
394         Bucket bucket PG_USED_FOR_ASSERTS_ONLY;
395
396         /* Get information from the doomed page */
397         _hash_checkpage(rel, ovflbuf, LH_OVERFLOW_PAGE);
398         ovflblkno = BufferGetBlockNumber(ovflbuf);
399         ovflpage = BufferGetPage(ovflbuf);
400         ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
401         nextblkno = ovflopaque->hasho_nextblkno;
402         prevblkno = ovflopaque->hasho_prevblkno;
403         bucket = ovflopaque->hasho_bucket;
404
405         /*
406          * Zero the page for debugging's sake; then write and release it. (Note:
407          * if we failed to zero the page here, we'd have problems with the Assert
408          * in _hash_pageinit() when the page is reused.)
409          */
410         MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf));
411         _hash_wrtbuf(rel, ovflbuf);
412
413         /*
414          * Fix up the bucket chain.  this is a doubly-linked list, so we must fix
415          * up the bucket chain members behind and ahead of the overflow page being
416          * deleted.  No concurrency issues since we hold exclusive lock on the
417          * entire bucket.
418          */
419         if (BlockNumberIsValid(prevblkno))
420         {
421                 Buffer          prevbuf = _hash_getbuf_with_strategy(rel,
422                                                                                                                  prevblkno,
423                                                                                                                  HASH_WRITE,
424                                                                                    LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
425                                                                                                                  bstrategy);
426                 Page            prevpage = BufferGetPage(prevbuf);
427                 HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
428
429                 Assert(prevopaque->hasho_bucket == bucket);
430                 prevopaque->hasho_nextblkno = nextblkno;
431                 _hash_wrtbuf(rel, prevbuf);
432         }
433         if (BlockNumberIsValid(nextblkno))
434         {
435                 Buffer          nextbuf = _hash_getbuf_with_strategy(rel,
436                                                                                                                  nextblkno,
437                                                                                                                  HASH_WRITE,
438                                                                                                                  LH_OVERFLOW_PAGE,
439                                                                                                                  bstrategy);
440                 Page            nextpage = BufferGetPage(nextbuf);
441                 HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
442
443                 Assert(nextopaque->hasho_bucket == bucket);
444                 nextopaque->hasho_prevblkno = prevblkno;
445                 _hash_wrtbuf(rel, nextbuf);
446         }
447
448         /* Note: bstrategy is intentionally not used for metapage and bitmap */
449
450         /* Read the metapage so we can determine which bitmap page to use */
451         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
452         metap = HashPageGetMeta(BufferGetPage(metabuf));
453
454         /* Identify which bit to set */
455         ovflbitno = blkno_to_bitno(metap, ovflblkno);
456
457         bitmappage = ovflbitno >> BMPG_SHIFT(metap);
458         bitmapbit = ovflbitno & BMPG_MASK(metap);
459
460         if (bitmappage >= metap->hashm_nmaps)
461                 elog(ERROR, "invalid overflow bit number %u", ovflbitno);
462         blkno = metap->hashm_mapp[bitmappage];
463
464         /* Release metapage lock while we access the bitmap page */
465         _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
466
467         /* Clear the bitmap bit to indicate that this overflow page is free */
468         mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BITMAP_PAGE);
469         mappage = BufferGetPage(mapbuf);
470         freep = HashPageGetBitmap(mappage);
471         Assert(ISSET(freep, bitmapbit));
472         CLRBIT(freep, bitmapbit);
473         _hash_wrtbuf(rel, mapbuf);
474
475         /* Get write-lock on metapage to update firstfree */
476         _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
477
478         /* if this is now the first free page, update hashm_firstfree */
479         if (ovflbitno < metap->hashm_firstfree)
480         {
481                 metap->hashm_firstfree = ovflbitno;
482                 _hash_wrtbuf(rel, metabuf);
483         }
484         else
485         {
486                 /* no need to change metapage */
487                 _hash_relbuf(rel, metabuf);
488         }
489
490         return nextblkno;
491 }
492
493
494 /*
495  *      _hash_initbitmap()
496  *
497  *       Initialize a new bitmap page.  The metapage has a write-lock upon
498  *       entering the function, and must be written by caller after return.
499  *
500  * 'blkno' is the block number of the new bitmap page.
501  *
502  * All bits in the new bitmap page are set to "1", indicating "in use".
503  */
504 void
505 _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno,
506                                  ForkNumber forkNum)
507 {
508         Buffer          buf;
509         Page            pg;
510         HashPageOpaque op;
511         uint32     *freep;
512
513         /*
514          * It is okay to write-lock the new bitmap page while holding metapage
515          * write lock, because no one else could be contending for the new page.
516          * Also, the metapage lock makes it safe to extend the index using
517          * _hash_getnewbuf.
518          *
519          * There is some loss of concurrency in possibly doing I/O for the new
520          * page while holding the metapage lock, but this path is taken so seldom
521          * that it's not worth worrying about.
522          */
523         buf = _hash_getnewbuf(rel, blkno, forkNum);
524         pg = BufferGetPage(buf);
525
526         /* initialize the page's special space */
527         op = (HashPageOpaque) PageGetSpecialPointer(pg);
528         op->hasho_prevblkno = InvalidBlockNumber;
529         op->hasho_nextblkno = InvalidBlockNumber;
530         op->hasho_bucket = -1;
531         op->hasho_flag = LH_BITMAP_PAGE;
532         op->hasho_page_id = HASHO_PAGE_ID;
533
534         /* set all of the bits to 1 */
535         freep = HashPageGetBitmap(pg);
536         MemSet(freep, 0xFF, BMPGSZ_BYTE(metap));
537
538         /* write out the new bitmap page (releasing write lock and pin) */
539         _hash_wrtbuf(rel, buf);
540
541         /* add the new bitmap page to the metapage's list of bitmaps */
542         /* metapage already has a write lock */
543         if (metap->hashm_nmaps >= HASH_MAX_BITMAPS)
544                 ereport(ERROR,
545                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
546                                  errmsg("out of overflow pages in hash index \"%s\"",
547                                                 RelationGetRelationName(rel))));
548
549         metap->hashm_mapp[metap->hashm_nmaps] = blkno;
550
551         metap->hashm_nmaps++;
552 }
553
554
555 /*
556  *      _hash_squeezebucket(rel, bucket)
557  *
558  *      Try to squeeze the tuples onto pages occurring earlier in the
559  *      bucket chain in an attempt to free overflow pages. When we start
560  *      the "squeezing", the page from which we start taking tuples (the
561  *      "read" page) is the last bucket in the bucket chain and the page
562  *      onto which we start squeezing tuples (the "write" page) is the
563  *      first page in the bucket chain.  The read page works backward and
564  *      the write page works forward; the procedure terminates when the
565  *      read page and write page are the same page.
566  *
567  *      At completion of this procedure, it is guaranteed that all pages in
568  *      the bucket are nonempty, unless the bucket is totally empty (in
569  *      which case all overflow pages will be freed).  The original implementation
570  *      required that to be true on entry as well, but it's a lot easier for
571  *      callers to leave empty overflow pages and let this guy clean it up.
572  *
573  *      Caller must hold exclusive lock on the target bucket.  This allows
574  *      us to safely lock multiple pages in the bucket.
575  *
576  *      Since this function is invoked in VACUUM, we provide an access strategy
577  *      parameter that controls fetches of the bucket pages.
578  */
579 void
580 _hash_squeezebucket(Relation rel,
581                                         Bucket bucket,
582                                         BlockNumber bucket_blkno,
583                                         BufferAccessStrategy bstrategy)
584 {
585         BlockNumber wblkno;
586         BlockNumber rblkno;
587         Buffer          wbuf;
588         Buffer          rbuf;
589         Page            wpage;
590         Page            rpage;
591         HashPageOpaque wopaque;
592         HashPageOpaque ropaque;
593         bool            wbuf_dirty;
594
595         /*
596          * start squeezing into the base bucket page.
597          */
598         wblkno = bucket_blkno;
599         wbuf = _hash_getbuf_with_strategy(rel,
600                                                                           wblkno,
601                                                                           HASH_WRITE,
602                                                                           LH_BUCKET_PAGE,
603                                                                           bstrategy);
604         wpage = BufferGetPage(wbuf);
605         wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
606
607         /*
608          * if there aren't any overflow pages, there's nothing to squeeze.
609          */
610         if (!BlockNumberIsValid(wopaque->hasho_nextblkno))
611         {
612                 _hash_relbuf(rel, wbuf);
613                 return;
614         }
615
616         /*
617          * Find the last page in the bucket chain by starting at the base bucket
618          * page and working forward.  Note: we assume that a hash bucket chain is
619          * usually smaller than the buffer ring being used by VACUUM, else using
620          * the access strategy here would be counterproductive.
621          */
622         rbuf = InvalidBuffer;
623         ropaque = wopaque;
624         do
625         {
626                 rblkno = ropaque->hasho_nextblkno;
627                 if (rbuf != InvalidBuffer)
628                         _hash_relbuf(rel, rbuf);
629                 rbuf = _hash_getbuf_with_strategy(rel,
630                                                                                   rblkno,
631                                                                                   HASH_WRITE,
632                                                                                   LH_OVERFLOW_PAGE,
633                                                                                   bstrategy);
634                 rpage = BufferGetPage(rbuf);
635                 ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
636                 Assert(ropaque->hasho_bucket == bucket);
637         } while (BlockNumberIsValid(ropaque->hasho_nextblkno));
638
639         /*
640          * squeeze the tuples.
641          */
642         wbuf_dirty = false;
643         for (;;)
644         {
645                 OffsetNumber roffnum;
646                 OffsetNumber maxroffnum;
647                 OffsetNumber deletable[MaxOffsetNumber];
648                 int                     ndeletable = 0;
649
650                 /* Scan each tuple in "read" page */
651                 maxroffnum = PageGetMaxOffsetNumber(rpage);
652                 for (roffnum = FirstOffsetNumber;
653                          roffnum <= maxroffnum;
654                          roffnum = OffsetNumberNext(roffnum))
655                 {
656                         IndexTuple      itup;
657                         Size            itemsz;
658
659                         itup = (IndexTuple) PageGetItem(rpage,
660                                                                                         PageGetItemId(rpage, roffnum));
661                         itemsz = IndexTupleDSize(*itup);
662                         itemsz = MAXALIGN(itemsz);
663
664                         /*
665                          * Walk up the bucket chain, looking for a page big enough for
666                          * this item.  Exit if we reach the read page.
667                          */
668                         while (PageGetFreeSpace(wpage) < itemsz)
669                         {
670                                 Assert(!PageIsEmpty(wpage));
671
672                                 wblkno = wopaque->hasho_nextblkno;
673                                 Assert(BlockNumberIsValid(wblkno));
674
675                                 if (wbuf_dirty)
676                                         _hash_wrtbuf(rel, wbuf);
677                                 else
678                                         _hash_relbuf(rel, wbuf);
679
680                                 /* nothing more to do if we reached the read page */
681                                 if (rblkno == wblkno)
682                                 {
683                                         if (ndeletable > 0)
684                                         {
685                                                 /* Delete tuples we already moved off read page */
686                                                 PageIndexMultiDelete(rpage, deletable, ndeletable);
687                                                 _hash_wrtbuf(rel, rbuf);
688                                         }
689                                         else
690                                                 _hash_relbuf(rel, rbuf);
691                                         return;
692                                 }
693
694                                 wbuf = _hash_getbuf_with_strategy(rel,
695                                                                                                   wblkno,
696                                                                                                   HASH_WRITE,
697                                                                                                   LH_OVERFLOW_PAGE,
698                                                                                                   bstrategy);
699                                 wpage = BufferGetPage(wbuf);
700                                 wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
701                                 Assert(wopaque->hasho_bucket == bucket);
702                                 wbuf_dirty = false;
703                         }
704
705                         /*
706                          * we have found room so insert on the "write" page, being careful
707                          * to preserve hashkey ordering.  (If we insert many tuples into
708                          * the same "write" page it would be worth qsort'ing instead of
709                          * doing repeated _hash_pgaddtup.)
710                          */
711                         (void) _hash_pgaddtup(rel, wbuf, itemsz, itup);
712                         wbuf_dirty = true;
713
714                         /* remember tuple for deletion from "read" page */
715                         deletable[ndeletable++] = roffnum;
716                 }
717
718                 /*
719                  * If we reach here, there are no live tuples on the "read" page ---
720                  * it was empty when we got to it, or we moved them all.  So we can
721                  * just free the page without bothering with deleting tuples
722                  * individually.  Then advance to the previous "read" page.
723                  *
724                  * Tricky point here: if our read and write pages are adjacent in the
725                  * bucket chain, our write lock on wbuf will conflict with
726                  * _hash_freeovflpage's attempt to update the sibling links of the
727                  * removed page.  However, in that case we are done anyway, so we can
728                  * simply drop the write lock before calling _hash_freeovflpage.
729                  */
730                 rblkno = ropaque->hasho_prevblkno;
731                 Assert(BlockNumberIsValid(rblkno));
732
733                 /* are we freeing the page adjacent to wbuf? */
734                 if (rblkno == wblkno)
735                 {
736                         /* yes, so release wbuf lock first */
737                         if (wbuf_dirty)
738                                 _hash_wrtbuf(rel, wbuf);
739                         else
740                                 _hash_relbuf(rel, wbuf);
741                         /* free this overflow page (releases rbuf) */
742                         _hash_freeovflpage(rel, rbuf, bstrategy);
743                         /* done */
744                         return;
745                 }
746
747                 /* free this overflow page, then get the previous one */
748                 _hash_freeovflpage(rel, rbuf, bstrategy);
749
750                 rbuf = _hash_getbuf_with_strategy(rel,
751                                                                                   rblkno,
752                                                                                   HASH_WRITE,
753                                                                                   LH_OVERFLOW_PAGE,
754                                                                                   bstrategy);
755                 rpage = BufferGetPage(rbuf);
756                 ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
757                 Assert(ropaque->hasho_bucket == bucket);
758         }
759
760         /* NOTREACHED */
761 }