]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hashpage.c
b6ea8cf31a24ea4088edb6ffebe38f3b461f9c3a
[postgresql] / src / backend / access / hash / hashpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * hashpage.c
4  *        Hash table page management code for the Postgres hash access method
5  *
6  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.38 2003/08/04 02:39:57 momjian Exp $
12  *
13  * NOTES
14  *        Postgres hash pages look like ordinary relation pages.  The opaque
15  *        data at high addresses includes information about the page including
16  *        whether a page is an overflow page or a true bucket, the block
17  *        numbers of the preceding and following pages, and the overflow
18  *        address of the page if it is an overflow page.
19  *
20  *        The first page in a hash relation, page zero, is special -- it stores
21  *        information describing the hash table; it is referred to as the
22  *        "meta page." Pages one and higher store the actual data.
23  *
24  *-------------------------------------------------------------------------
25  */
26
27 #include "postgres.h"
28
29 #include "access/genam.h"
30 #include "access/hash.h"
31 #include "miscadmin.h"
32 #include "storage/lmgr.h"
33
34
35 static void _hash_setpagelock(Relation rel, BlockNumber blkno, int access);
36 static void _hash_unsetpagelock(Relation rel, BlockNumber blkno, int access);
37 static void _hash_splitpage(Relation rel, Buffer metabuf, Bucket obucket, Bucket nbucket);
38
39 /*
40  *      We use high-concurrency locking on hash indices.  There are two cases in
41  *      which we don't do locking.  One is when we're building the index.
42  *      Since the creating transaction has not committed, no one can see
43  *      the index, and there's no reason to share locks.  The second case
44  *      is when we're just starting up the database system.  We use some
45  *      special-purpose initialization code in the relation cache manager
46  *      (see utils/cache/relcache.c) to allow us to do indexed scans on
47  *      the system catalogs before we'd normally be able to.  This happens
48  *      before the lock table is fully initialized, so we can't use it.
49  *      Strictly speaking, this violates 2pl, but we don't do 2pl on the
50  *      system catalogs anyway.
51  *
52  *      Note that our page locks are actual lockmanager locks, not buffer
53  *      locks (as are used by btree, for example).      This is a good idea because
54  *      the algorithms are not deadlock-free, and we'd better be able to detect
55  *      and recover from deadlocks.
56  *
57  *      Another important difference from btree is that a hash indexscan
58  *      retains both a lock and a buffer pin on the current index page
59  *      between hashgettuple() calls (btree keeps only a buffer pin).
60  *      Because of this, it's safe to do item deletions with only a regular
61  *      write lock on a hash page --- there cannot be an indexscan stopped on
62  *      the page being deleted, other than an indexscan of our own backend,
63  *      which will be taken care of by _hash_adjscans.
64  */
65
66
67 #define USELOCKING              (!BuildingHash && !IsInitProcessingMode())
68
69
70 /*
71  *      _hash_metapinit() -- Initialize the metadata page of a hash index,
72  *                              the two buckets that we begin with and the initial
73  *                              bitmap page.
74  */
75 void
76 _hash_metapinit(Relation rel)
77 {
78         HashMetaPage metap;
79         HashPageOpaque pageopaque;
80         Buffer          metabuf;
81         Buffer          buf;
82         Page            pg;
83         int                     nbuckets;
84         uint32          nelem;                  /* number elements */
85         uint32          lg2nelem;               /* _hash_log2(nelem)   */
86         uint16          i;
87
88         /* can't be sharing this with anyone, now... */
89         if (USELOCKING)
90                 LockRelation(rel, AccessExclusiveLock);
91
92         if (RelationGetNumberOfBlocks(rel) != 0)
93                 elog(ERROR, "cannot initialize non-empty hash index \"%s\"",
94                          RelationGetRelationName(rel));
95
96         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
97         pg = BufferGetPage(metabuf);
98         metap = (HashMetaPage) pg;
99         _hash_pageinit(pg, BufferGetPageSize(metabuf));
100
101         metap->hashm_magic = HASH_MAGIC;
102         metap->hashm_version = HASH_VERSION;
103         metap->hashm_nkeys = 0;
104         metap->hashm_nmaps = 0;
105         metap->hashm_ffactor = DEFAULT_FFACTOR;
106         metap->hashm_bsize = BufferGetPageSize(metabuf);
107         metap->hashm_bshift = _hash_log2(metap->hashm_bsize);
108         for (i = metap->hashm_bshift; i > 0; --i)
109         {
110                 if ((1 << i) < (metap->hashm_bsize -
111                                                 (MAXALIGN(sizeof(PageHeaderData)) +
112                                                  MAXALIGN(sizeof(HashPageOpaqueData)))))
113                         break;
114         }
115         Assert(i);
116         metap->hashm_bmsize = 1 << i;
117         metap->hashm_procid = index_getprocid(rel, 1, HASHPROC);
118
119         /*
120          * Make nelem = 2 rather than 0 so that we end up allocating space for
121          * the next greater power of two number of buckets.
122          */
123         nelem = 2;
124         lg2nelem = 1;                           /* _hash_log2(MAX(nelem, 2)) */
125         nbuckets = 2;                           /* 1 << lg2nelem */
126
127         MemSet((char *) metap->hashm_spares, 0, sizeof(metap->hashm_spares));
128         MemSet((char *) metap->hashm_mapp, 0, sizeof(metap->hashm_mapp));
129
130         metap->hashm_spares[lg2nelem] = 2;      /* lg2nelem + 1 */
131         metap->hashm_spares[lg2nelem + 1] = 2;          /* lg2nelem + 1 */
132         metap->hashm_ovflpoint = 1; /* lg2nelem */
133         metap->hashm_lastfreed = 2;
134
135         metap->hashm_maxbucket = metap->hashm_lowmask = 1;      /* nbuckets - 1 */
136         metap->hashm_highmask = 3;      /* (nbuckets << 1) - 1 */
137
138         pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
139         pageopaque->hasho_oaddr = InvalidOvflAddress;
140         pageopaque->hasho_prevblkno = InvalidBlockNumber;
141         pageopaque->hasho_nextblkno = InvalidBlockNumber;
142         pageopaque->hasho_flag = LH_META_PAGE;
143         pageopaque->hasho_bucket = -1;
144
145         /*
146          * First bitmap page is at: splitpoint lg2nelem page offset 1 which
147          * turns out to be page 3. Couldn't initialize page 3  until we
148          * created the first two buckets above.
149          */
150         if (_hash_initbitmap(rel, metap, OADDR_OF(lg2nelem, 1), lg2nelem + 1, 0))
151                 elog(ERROR, "_hash_initbitmap failed");
152
153         /* all done */
154         _hash_wrtnorelbuf(metabuf);
155
156         /*
157          * initialize the first two buckets
158          */
159         for (i = 0; i <= 1; i++)
160         {
161                 buf = _hash_getbuf(rel, BUCKET_TO_BLKNO(i), HASH_WRITE);
162                 pg = BufferGetPage(buf);
163                 _hash_pageinit(pg, BufferGetPageSize(buf));
164                 pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
165                 pageopaque->hasho_oaddr = InvalidOvflAddress;
166                 pageopaque->hasho_prevblkno = InvalidBlockNumber;
167                 pageopaque->hasho_nextblkno = InvalidBlockNumber;
168                 pageopaque->hasho_flag = LH_BUCKET_PAGE;
169                 pageopaque->hasho_bucket = i;
170                 _hash_wrtbuf(rel, buf);
171         }
172
173         _hash_relbuf(rel, metabuf, HASH_WRITE);
174
175         if (USELOCKING)
176                 UnlockRelation(rel, AccessExclusiveLock);
177 }
178
179 /*
180  *      _hash_getbuf() -- Get a buffer by block number for read or write.
181  *
182  *              When this routine returns, the appropriate lock is set on the
183  *              requested buffer its reference count is correct.
184  *
185  *              XXX P_NEW is not used because, unlike the tree structures, we
186  *              need the bucket blocks to be at certain block numbers.  we must
187  *              depend on the caller to call _hash_pageinit on the block if it
188  *              knows that this is a new block.
189  */
190 Buffer
191 _hash_getbuf(Relation rel, BlockNumber blkno, int access)
192 {
193         Buffer          buf;
194
195         if (blkno == P_NEW)
196                 elog(ERROR, "hash AM does not use P_NEW");
197         switch (access)
198         {
199                 case HASH_WRITE:
200                 case HASH_READ:
201                         _hash_setpagelock(rel, blkno, access);
202                         break;
203                 default:
204                         elog(ERROR, "unrecognized hash access code: %d", access);
205                         break;
206         }
207         buf = ReadBuffer(rel, blkno);
208
209         /* ref count and lock type are correct */
210         return buf;
211 }
212
213 /*
214  *      _hash_relbuf() -- release a locked buffer.
215  */
216 void
217 _hash_relbuf(Relation rel, Buffer buf, int access)
218 {
219         BlockNumber blkno;
220
221         blkno = BufferGetBlockNumber(buf);
222
223         switch (access)
224         {
225                 case HASH_WRITE:
226                 case HASH_READ:
227                         _hash_unsetpagelock(rel, blkno, access);
228                         break;
229                 default:
230                         elog(ERROR, "unrecognized hash access code: %d", access);
231                         break;
232         }
233
234         ReleaseBuffer(buf);
235 }
236
237 /*
238  *      _hash_wrtbuf() -- write a hash page to disk.
239  *
240  *              This routine releases the lock held on the buffer and our reference
241  *              to it.  It is an error to call _hash_wrtbuf() without a write lock
242  *              or a reference to the buffer.
243  */
244 void
245 _hash_wrtbuf(Relation rel, Buffer buf)
246 {
247         BlockNumber blkno;
248
249         blkno = BufferGetBlockNumber(buf);
250         WriteBuffer(buf);
251         _hash_unsetpagelock(rel, blkno, HASH_WRITE);
252 }
253
254 /*
255  *      _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
256  *                                               our reference or lock.
257  *
258  *              It is an error to call _hash_wrtnorelbuf() without a write lock
259  *              or a reference to the buffer.
260  */
261 void
262 _hash_wrtnorelbuf(Buffer buf)
263 {
264         BlockNumber blkno;
265
266         blkno = BufferGetBlockNumber(buf);
267         WriteNoReleaseBuffer(buf);
268 }
269
270 Page
271 _hash_chgbufaccess(Relation rel,
272                                    Buffer *bufp,
273                                    int from_access,
274                                    int to_access)
275 {
276         BlockNumber blkno;
277
278         blkno = BufferGetBlockNumber(*bufp);
279
280         switch (from_access)
281         {
282                 case HASH_WRITE:
283                         _hash_wrtbuf(rel, *bufp);
284                         break;
285                 case HASH_READ:
286                         _hash_relbuf(rel, *bufp, from_access);
287                         break;
288                 default:
289                         elog(ERROR, "unrecognized hash access code: %d", from_access);
290                         break;
291         }
292         *bufp = _hash_getbuf(rel, blkno, to_access);
293         return BufferGetPage(*bufp);
294 }
295
296 /*
297  *      _hash_pageinit() -- Initialize a new page.
298  */
299 void
300 _hash_pageinit(Page page, Size size)
301 {
302         Assert(PageIsNew(page));
303         PageInit(page, size, sizeof(HashPageOpaqueData));
304 }
305
306 static void
307 _hash_setpagelock(Relation rel,
308                                   BlockNumber blkno,
309                                   int access)
310 {
311
312         if (USELOCKING)
313         {
314                 switch (access)
315                 {
316                         case HASH_WRITE:
317                                 LockPage(rel, blkno, ExclusiveLock);
318                                 break;
319                         case HASH_READ:
320                                 LockPage(rel, blkno, ShareLock);
321                                 break;
322                         default:
323                                 elog(ERROR, "unrecognized hash access code: %d", access);
324                                 break;
325                 }
326         }
327 }
328
329 static void
330 _hash_unsetpagelock(Relation rel,
331                                         BlockNumber blkno,
332                                         int access)
333 {
334
335         if (USELOCKING)
336         {
337                 switch (access)
338                 {
339                         case HASH_WRITE:
340                                 UnlockPage(rel, blkno, ExclusiveLock);
341                                 break;
342                         case HASH_READ:
343                                 UnlockPage(rel, blkno, ShareLock);
344                                 break;
345                         default:
346                                 elog(ERROR, "unrecognized hash access code: %d", access);
347                                 break;
348                 }
349         }
350 }
351
352 /*
353  * Delete a hash index item.
354  *
355  * It is safe to delete an item after acquiring a regular WRITE lock on
356  * the page, because no other backend can hold a READ lock on the page,
357  * and that means no other backend currently has an indexscan stopped on
358  * any item of the item being deleted.  Our own backend might have such
359  * an indexscan (in fact *will*, since that's how VACUUM found the item
360  * in the first place), but _hash_adjscans will fix the scan position.
361  */
362 void
363 _hash_pagedel(Relation rel, ItemPointer tid)
364 {
365         Buffer          buf;
366         Buffer          metabuf;
367         Page            page;
368         BlockNumber blkno;
369         OffsetNumber offno;
370         HashMetaPage metap;
371         HashPageOpaque opaque;
372
373         blkno = ItemPointerGetBlockNumber(tid);
374         offno = ItemPointerGetOffsetNumber(tid);
375
376         buf = _hash_getbuf(rel, blkno, HASH_WRITE);
377         page = BufferGetPage(buf);
378         _hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
379         opaque = (HashPageOpaque) PageGetSpecialPointer(page);
380
381         PageIndexTupleDelete(page, offno);
382         _hash_wrtnorelbuf(buf);
383
384         if (PageIsEmpty(page) && (opaque->hasho_flag & LH_OVERFLOW_PAGE))
385         {
386                 buf = _hash_freeovflpage(rel, buf);
387                 if (BufferIsValid(buf))
388                         _hash_relbuf(rel, buf, HASH_WRITE);
389         }
390         else
391                 _hash_relbuf(rel, buf, HASH_WRITE);
392
393         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
394         metap = (HashMetaPage) BufferGetPage(metabuf);
395         _hash_checkpage((Page) metap, LH_META_PAGE);
396         metap->hashm_nkeys--;
397         _hash_wrtbuf(rel, metabuf);
398 }
399
400 void
401 _hash_expandtable(Relation rel, Buffer metabuf)
402 {
403         HashMetaPage metap;
404         Bucket          old_bucket;
405         Bucket          new_bucket;
406         uint32          spare_ndx;
407
408         metap = (HashMetaPage) BufferGetPage(metabuf);
409         _hash_checkpage((Page) metap, LH_META_PAGE);
410
411         metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
412         new_bucket = ++metap->hashm_maxbucket;
413         metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
414         old_bucket = (metap->hashm_maxbucket & metap->hashm_lowmask);
415
416         /*
417          * If the split point is increasing (hashm_maxbucket's log base 2 *
418          * increases), we need to copy the current contents of the spare split
419          * bucket to the next bucket.
420          */
421         spare_ndx = _hash_log2(metap->hashm_maxbucket + 1);
422         if (spare_ndx > metap->hashm_ovflpoint)
423         {
424
425                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
426                 metap->hashm_spares[spare_ndx] = metap->hashm_spares[metap->hashm_ovflpoint];
427                 metap->hashm_ovflpoint = spare_ndx;
428                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
429         }
430
431         if (new_bucket > metap->hashm_highmask)
432         {
433
434                 /* Starting a new doubling */
435                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
436                 metap->hashm_lowmask = metap->hashm_highmask;
437                 metap->hashm_highmask = new_bucket | metap->hashm_lowmask;
438                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
439
440         }
441         /* Relocate records to the new bucket */
442         _hash_splitpage(rel, metabuf, old_bucket, new_bucket);
443 }
444
445
446 /*
447  * _hash_splitpage -- split 'obucket' into 'obucket' and 'nbucket'
448  *
449  * this routine is actually misnamed -- we are splitting a bucket that
450  * consists of a base bucket page and zero or more overflow (bucket
451  * chain) pages.
452  */
453 static void
454 _hash_splitpage(Relation rel,
455                                 Buffer metabuf,
456                                 Bucket obucket,
457                                 Bucket nbucket)
458 {
459         Bucket          bucket;
460         Buffer          obuf;
461         Buffer          nbuf;
462         Buffer          ovflbuf;
463         BlockNumber oblkno;
464         BlockNumber nblkno;
465         bool            null;
466         Datum           datum;
467         HashItem        hitem;
468         HashPageOpaque oopaque;
469         HashPageOpaque nopaque;
470         HashMetaPage metap;
471         IndexTuple      itup;
472         Size            itemsz;
473         OffsetNumber ooffnum;
474         OffsetNumber noffnum;
475         OffsetNumber omaxoffnum;
476         Page            opage;
477         Page            npage;
478         TupleDesc       itupdesc;
479
480         metap = (HashMetaPage) BufferGetPage(metabuf);
481         _hash_checkpage((Page) metap, LH_META_PAGE);
482
483         /* get the buffers & pages */
484         oblkno = BUCKET_TO_BLKNO(obucket);
485         nblkno = BUCKET_TO_BLKNO(nbucket);
486         obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
487         nbuf = _hash_getbuf(rel, nblkno, HASH_WRITE);
488         opage = BufferGetPage(obuf);
489         npage = BufferGetPage(nbuf);
490
491         /* initialize the new bucket */
492         _hash_pageinit(npage, BufferGetPageSize(nbuf));
493         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
494         nopaque->hasho_prevblkno = InvalidBlockNumber;
495         nopaque->hasho_nextblkno = InvalidBlockNumber;
496         nopaque->hasho_flag = LH_BUCKET_PAGE;
497         nopaque->hasho_oaddr = InvalidOvflAddress;
498         nopaque->hasho_bucket = nbucket;
499         _hash_wrtnorelbuf(nbuf);
500
501         /*
502          * make sure the old bucket isn't empty.  advance 'opage' and friends
503          * through the overflow bucket chain until we find a non-empty page.
504          *
505          * XXX we should only need this once, if we are careful to preserve the
506          * invariant that overflow pages are never empty.
507          */
508         _hash_checkpage(opage, LH_BUCKET_PAGE);
509         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
510         if (PageIsEmpty(opage))
511         {
512                 oblkno = oopaque->hasho_nextblkno;
513                 _hash_relbuf(rel, obuf, HASH_WRITE);
514                 if (!BlockNumberIsValid(oblkno))
515                 {
516                         /*
517                          * the old bucket is completely empty; of course, the new
518                          * bucket will be as well, but since it's a base bucket page
519                          * we don't care.
520                          */
521                         _hash_relbuf(rel, nbuf, HASH_WRITE);
522                         return;
523                 }
524                 obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
525                 opage = BufferGetPage(obuf);
526                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
527                 if (PageIsEmpty(opage))
528                         elog(ERROR, "empty hash overflow page %u", oblkno);
529                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
530         }
531
532         /*
533          * we are now guaranteed that 'opage' is not empty.  partition the
534          * tuples in the old bucket between the old bucket and the new bucket,
535          * advancing along their respective overflow bucket chains and adding
536          * overflow pages as needed.
537          */
538         ooffnum = FirstOffsetNumber;
539         omaxoffnum = PageGetMaxOffsetNumber(opage);
540         for (;;)
541         {
542                 /*
543                  * at each iteration through this loop, each of these variables
544                  * should be up-to-date: obuf opage oopaque ooffnum omaxoffnum
545                  */
546
547                 /* check if we're at the end of the page */
548                 if (ooffnum > omaxoffnum)
549                 {
550                         /* at end of page, but check for overflow page */
551                         oblkno = oopaque->hasho_nextblkno;
552                         if (BlockNumberIsValid(oblkno))
553                         {
554                                 /*
555                                  * we ran out of tuples on this particular page, but we
556                                  * have more overflow pages; re-init values.
557                                  */
558                                 _hash_wrtbuf(rel, obuf);
559                                 obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
560                                 opage = BufferGetPage(obuf);
561                                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
562                                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
563                                 /* we're guaranteed that an ovfl page has at least 1 tuple */
564                                 if (PageIsEmpty(opage))
565                                         elog(ERROR, "empty hash overflow page %u", oblkno);
566                                 ooffnum = FirstOffsetNumber;
567                                 omaxoffnum = PageGetMaxOffsetNumber(opage);
568                         }
569                         else
570                         {
571                                 /*
572                                  * we're at the end of the bucket chain, so now we're
573                                  * really done with everything.  before quitting, call
574                                  * _hash_squeezebucket to ensure the tuples in the bucket
575                                  * (including the overflow pages) are packed as tightly as
576                                  * possible.
577                                  */
578                                 _hash_wrtbuf(rel, obuf);
579                                 _hash_wrtbuf(rel, nbuf);
580                                 _hash_squeezebucket(rel, metap, obucket);
581                                 return;
582                         }
583                 }
584
585                 /* hash on the tuple */
586                 hitem = (HashItem) PageGetItem(opage, PageGetItemId(opage, ooffnum));
587                 itup = &(hitem->hash_itup);
588                 itupdesc = RelationGetDescr(rel);
589                 datum = index_getattr(itup, 1, itupdesc, &null);
590                 bucket = _hash_call(rel, metap, datum);
591
592                 if (bucket == nbucket)
593                 {
594                         /*
595                          * insert the tuple into the new bucket.  if it doesn't fit on
596                          * the current page in the new bucket, we must allocate a new
597                          * overflow page and place the tuple on that page instead.
598                          */
599                         itemsz = IndexTupleDSize(hitem->hash_itup)
600                                 + (sizeof(HashItemData) - sizeof(IndexTupleData));
601
602                         itemsz = MAXALIGN(itemsz);
603
604                         if (PageGetFreeSpace(npage) < itemsz)
605                         {
606                                 ovflbuf = _hash_addovflpage(rel, &metabuf, nbuf);
607                                 _hash_wrtbuf(rel, nbuf);
608                                 nbuf = ovflbuf;
609                                 npage = BufferGetPage(nbuf);
610                                 _hash_checkpage(npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
611                         }
612
613                         noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
614                         if (PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED)
615                                 == InvalidOffsetNumber)
616                                 elog(ERROR, "failed to add index item to \"%s\"",
617                                          RelationGetRelationName(rel));
618                         _hash_wrtnorelbuf(nbuf);
619
620                         /*
621                          * now delete the tuple from the old bucket.  after this
622                          * section of code, 'ooffnum' will actually point to the
623                          * ItemId to which we would point if we had advanced it before
624                          * the deletion (PageIndexTupleDelete repacks the ItemId
625                          * array).      this also means that 'omaxoffnum' is exactly one
626                          * less than it used to be, so we really can just decrement it
627                          * instead of calling PageGetMaxOffsetNumber.
628                          */
629                         PageIndexTupleDelete(opage, ooffnum);
630                         _hash_wrtnorelbuf(obuf);
631                         omaxoffnum = OffsetNumberPrev(omaxoffnum);
632
633                         /*
634                          * tidy up.  if the old page was an overflow page and it is
635                          * now empty, we must free it (we want to preserve the
636                          * invariant that overflow pages cannot be empty).
637                          */
638                         if (PageIsEmpty(opage) &&
639                                 (oopaque->hasho_flag & LH_OVERFLOW_PAGE))
640                         {
641                                 obuf = _hash_freeovflpage(rel, obuf);
642
643                                 /* check that we're not through the bucket chain */
644                                 if (BufferIsInvalid(obuf))
645                                 {
646                                         _hash_wrtbuf(rel, nbuf);
647                                         _hash_squeezebucket(rel, metap, obucket);
648                                         return;
649                                 }
650
651                                 /*
652                                  * re-init. again, we're guaranteed that an ovfl page has
653                                  * at least one tuple.
654                                  */
655                                 opage = BufferGetPage(obuf);
656                                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
657                                 oblkno = BufferGetBlockNumber(obuf);
658                                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
659                                 if (PageIsEmpty(opage))
660                                         elog(ERROR, "empty hash overflow page %u", oblkno);
661                                 ooffnum = FirstOffsetNumber;
662                                 omaxoffnum = PageGetMaxOffsetNumber(opage);
663                         }
664                 }
665                 else
666                 {
667                         /*
668                          * the tuple stays on this page.  we didn't move anything, so
669                          * we didn't delete anything and therefore we don't have to
670                          * change 'omaxoffnum'.
671                          *
672                          * XXX any hash value from [0, nbucket-1] will map to this
673                          * bucket, which doesn't make sense to me.
674                          */
675                         ooffnum = OffsetNumberNext(ooffnum);
676                 }
677         }
678         /* NOTREACHED */
679 }