]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hashpage.c
Add:
[postgresql] / src / backend / access / hash / hashpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * hashpage.c
4  *        Hash table page management code for the Postgres hash access method
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.26 2000/01/26 05:55:55 momjian Exp $
12  *
13  * NOTES
14  *        Postgres hash pages look like ordinary relation pages.  The opaque
15  *        data at high addresses includes information about the page including
16  *        whether a page is an overflow page or a true bucket, the block
17  *        numbers of the preceding and following pages, and the overflow
18  *        address of the page if it is an overflow page.
19  *
20  *        The first page in a hash relation, page zero, is special -- it stores
21  *        information describing the hash table; it is referred to as teh
22  *        "meta page." Pages one and higher store the actual data.
23  *
24  *-------------------------------------------------------------------------
25  */
26
27 #include "postgres.h"
28
29 #include "access/genam.h"
30 #include "access/hash.h"
31 #include "miscadmin.h"
32
33
34 static void _hash_setpagelock(Relation rel, BlockNumber blkno, int access);
35 static void _hash_unsetpagelock(Relation rel, BlockNumber blkno, int access);
36 static void _hash_splitpage(Relation rel, Buffer metabuf, Bucket obucket, Bucket nbucket);
37
38 /*
39  *      We use high-concurrency locking on hash indices.  There are two cases in
40  *      which we don't do locking.  One is when we're building the index.
41  *      Since the creating transaction has not committed, no one can see
42  *      the index, and there's no reason to share locks.  The second case
43  *      is when we're just starting up the database system.  We use some
44  *      special-purpose initialization code in the relation cache manager
45  *      (see utils/cache/relcache.c) to allow us to do indexed scans on
46  *      the system catalogs before we'd normally be able to.  This happens
47  *      before the lock table is fully initialized, so we can't use it.
48  *      Strictly speaking, this violates 2pl, but we don't do 2pl on the
49  *      system catalogs anyway.
50  */
51
52
53 #define USELOCKING              (!BuildingHash && !IsInitProcessingMode())
54
55
56 /*
57  *      _hash_metapinit() -- Initialize the metadata page of a hash index,
58  *                              the two buckets that we begin with and the initial
59  *                              bitmap page.
60  */
61 void
62 _hash_metapinit(Relation rel)
63 {
64         HashMetaPage metap;
65         HashPageOpaque pageopaque;
66         Buffer          metabuf;
67         Buffer          buf;
68         Page            pg;
69         int                     nbuckets;
70         uint32          nelem;                  /* number elements */
71         uint32          lg2nelem;               /* _hash_log2(nelem)   */
72         uint32          nblocks;
73         uint16          i;
74
75         /* can't be sharing this with anyone, now... */
76         if (USELOCKING)
77                 LockRelation(rel, AccessExclusiveLock);
78
79         if ((nblocks = RelationGetNumberOfBlocks(rel)) != 0)
80         {
81                 elog(ERROR, "Cannot initialize non-empty hash table %s",
82                          RelationGetRelationName(rel));
83         }
84
85         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
86         pg = BufferGetPage(metabuf);
87         metap = (HashMetaPage) pg;
88         _hash_pageinit(pg, BufferGetPageSize(metabuf));
89
90         metap->hashm_magic = HASH_MAGIC;
91         metap->hashm_version = HASH_VERSION;
92         metap->hashm_nkeys = 0;
93         metap->hashm_nmaps = 0;
94         metap->hashm_ffactor = DEFAULT_FFACTOR;
95         metap->hashm_bsize = BufferGetPageSize(metabuf);
96         metap->hashm_bshift = _hash_log2(metap->hashm_bsize);
97         for (i = metap->hashm_bshift; i > 0; --i)
98         {
99                 if ((1 << i) < (metap->hashm_bsize -
100                                                 (MAXALIGN(sizeof(PageHeaderData)) +
101                                                  MAXALIGN(sizeof(HashPageOpaqueData)))))
102                         break;
103         }
104         Assert(i);
105         metap->hashm_bmsize = 1 << i;
106         metap->hashm_procid = index_getprocid(rel, 1, HASHPROC);
107
108         /*
109          * Make nelem = 2 rather than 0 so that we end up allocating space for
110          * the next greater power of two number of buckets.
111          */
112         nelem = 2;
113         lg2nelem = 1;                           /* _hash_log2(MAX(nelem, 2)) */
114         nbuckets = 2;                           /* 1 << lg2nelem */
115
116         MemSet((char *) metap->hashm_spares, 0, sizeof(metap->hashm_spares));
117         MemSet((char *) metap->hashm_mapp, 0, sizeof(metap->hashm_mapp));
118
119         metap->hashm_spares[lg2nelem] = 2;      /* lg2nelem + 1 */
120         metap->hashm_spares[lg2nelem + 1] = 2;          /* lg2nelem + 1 */
121         metap->hashm_ovflpoint = 1; /* lg2nelem */
122         metap->hashm_lastfreed = 2;
123
124         metap->hashm_maxbucket = metap->hashm_lowmask = 1;      /* nbuckets - 1 */
125         metap->hashm_highmask = 3;      /* (nbuckets << 1) - 1 */
126
127         pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
128         pageopaque->hasho_oaddr = InvalidOvflAddress;
129         pageopaque->hasho_prevblkno = InvalidBlockNumber;
130         pageopaque->hasho_nextblkno = InvalidBlockNumber;
131         pageopaque->hasho_flag = LH_META_PAGE;
132         pageopaque->hasho_bucket = -1;
133
134         /*
135          * First bitmap page is at: splitpoint lg2nelem page offset 1 which
136          * turns out to be page 3. Couldn't initialize page 3  until we
137          * created the first two buckets above.
138          */
139         if (_hash_initbitmap(rel, metap, OADDR_OF(lg2nelem, 1), lg2nelem + 1, 0))
140                 elog(ERROR, "Problem with _hash_initbitmap.");
141
142         /* all done */
143         _hash_wrtnorelbuf(rel, metabuf);
144
145         /*
146          * initialize the first two buckets
147          */
148         for (i = 0; i <= 1; i++)
149         {
150                 buf = _hash_getbuf(rel, BUCKET_TO_BLKNO(i), HASH_WRITE);
151                 pg = BufferGetPage(buf);
152                 _hash_pageinit(pg, BufferGetPageSize(buf));
153                 pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
154                 pageopaque->hasho_oaddr = InvalidOvflAddress;
155                 pageopaque->hasho_prevblkno = InvalidBlockNumber;
156                 pageopaque->hasho_nextblkno = InvalidBlockNumber;
157                 pageopaque->hasho_flag = LH_BUCKET_PAGE;
158                 pageopaque->hasho_bucket = i;
159                 _hash_wrtbuf(rel, buf);
160         }
161
162         _hash_relbuf(rel, metabuf, HASH_WRITE);
163
164         if (USELOCKING)
165                 UnlockRelation(rel, AccessExclusiveLock);
166 }
167
168 /*
169  *      _hash_getbuf() -- Get a buffer by block number for read or write.
170  *
171  *              When this routine returns, the appropriate lock is set on the
172  *              requested buffer its reference count is correct.
173  *
174  *              XXX P_NEW is not used because, unlike the tree structures, we
175  *              need the bucket blocks to be at certain block numbers.  we must
176  *              depend on the caller to call _hash_pageinit on the block if it
177  *              knows that this is a new block.
178  */
179 Buffer
180 _hash_getbuf(Relation rel, BlockNumber blkno, int access)
181 {
182         Buffer          buf;
183
184         if (blkno == P_NEW)
185                 elog(ERROR, "_hash_getbuf: internal error: hash AM does not use P_NEW");
186         switch (access)
187         {
188                 case HASH_WRITE:
189                 case HASH_READ:
190                         _hash_setpagelock(rel, blkno, access);
191                         break;
192                 default:
193                         elog(ERROR, "_hash_getbuf: invalid access (%d) on new blk: %s",
194                                  access, RelationGetRelationName(rel));
195                         break;
196         }
197         buf = ReadBuffer(rel, blkno);
198
199         /* ref count and lock type are correct */
200         return buf;
201 }
202
203 /*
204  *      _hash_relbuf() -- release a locked buffer.
205  */
206 void
207 _hash_relbuf(Relation rel, Buffer buf, int access)
208 {
209         BlockNumber blkno;
210
211         blkno = BufferGetBlockNumber(buf);
212
213         switch (access)
214         {
215                 case HASH_WRITE:
216                 case HASH_READ:
217                         _hash_unsetpagelock(rel, blkno, access);
218                         break;
219                 default:
220                         elog(ERROR, "_hash_relbuf: invalid access (%d) on blk %x: %s",
221                                  access, blkno, RelationGetRelationName(rel));
222         }
223
224         ReleaseBuffer(buf);
225 }
226
227 /*
228  *      _hash_wrtbuf() -- write a hash page to disk.
229  *
230  *              This routine releases the lock held on the buffer and our reference
231  *              to it.  It is an error to call _hash_wrtbuf() without a write lock
232  *              or a reference to the buffer.
233  */
234 void
235 _hash_wrtbuf(Relation rel, Buffer buf)
236 {
237         BlockNumber blkno;
238
239         blkno = BufferGetBlockNumber(buf);
240         WriteBuffer(buf);
241         _hash_unsetpagelock(rel, blkno, HASH_WRITE);
242 }
243
244 /*
245  *      _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
246  *                                               our reference or lock.
247  *
248  *              It is an error to call _hash_wrtnorelbuf() without a write lock
249  *              or a reference to the buffer.
250  */
251 void
252 _hash_wrtnorelbuf(Relation rel, Buffer buf)
253 {
254         BlockNumber blkno;
255
256         blkno = BufferGetBlockNumber(buf);
257         WriteNoReleaseBuffer(buf);
258 }
259
260 Page
261 _hash_chgbufaccess(Relation rel,
262                                    Buffer *bufp,
263                                    int from_access,
264                                    int to_access)
265 {
266         BlockNumber blkno;
267
268         blkno = BufferGetBlockNumber(*bufp);
269
270         switch (from_access)
271         {
272                 case HASH_WRITE:
273                         _hash_wrtbuf(rel, *bufp);
274                         break;
275                 case HASH_READ:
276                         _hash_relbuf(rel, *bufp, from_access);
277                         break;
278                 default:
279                         elog(ERROR, "_hash_chgbufaccess: invalid access (%d) on blk %x: %s",
280                                  from_access, blkno, RelationGetRelationName(rel));
281                         break;
282         }
283         *bufp = _hash_getbuf(rel, blkno, to_access);
284         return BufferGetPage(*bufp);
285 }
286
287 /*
288  *      _hash_pageinit() -- Initialize a new page.
289  */
290 void
291 _hash_pageinit(Page page, Size size)
292 {
293         Assert(((PageHeader) page)->pd_lower == 0);
294         Assert(((PageHeader) page)->pd_upper == 0);
295         Assert(((PageHeader) page)->pd_special == 0);
296
297         /*
298          * Cargo-cult programming -- don't really need this to be zero, but
299          * creating new pages is an infrequent occurrence and it makes me feel
300          * good when I know they're empty.
301          */
302         MemSet(page, 0, size);
303
304         PageInit(page, size, sizeof(HashPageOpaqueData));
305 }
306
307 static void
308 _hash_setpagelock(Relation rel,
309                                   BlockNumber blkno,
310                                   int access)
311 {
312
313         if (USELOCKING)
314         {
315                 switch (access)
316                 {
317                                 case HASH_WRITE:
318                                 LockPage(rel, blkno, ExclusiveLock);
319                                 break;
320                         case HASH_READ:
321                                 LockPage(rel, blkno, ShareLock);
322                                 break;
323                         default:
324                                 elog(ERROR, "_hash_setpagelock: invalid access (%d) on blk %x: %s",
325                                          access, blkno, RelationGetRelationName(rel));
326                                 break;
327                 }
328         }
329 }
330
331 static void
332 _hash_unsetpagelock(Relation rel,
333                                         BlockNumber blkno,
334                                         int access)
335 {
336
337         if (USELOCKING)
338         {
339                 switch (access)
340                 {
341                                 case HASH_WRITE:
342                                 UnlockPage(rel, blkno, ExclusiveLock);
343                                 break;
344                         case HASH_READ:
345                                 UnlockPage(rel, blkno, ShareLock);
346                                 break;
347                         default:
348                                 elog(ERROR, "_hash_unsetpagelock: invalid access (%d) on blk %x: %s",
349                                          access, blkno, RelationGetRelationName(rel));
350                                 break;
351                 }
352         }
353 }
354
355 void
356 _hash_pagedel(Relation rel, ItemPointer tid)
357 {
358         Buffer          buf;
359         Buffer          metabuf;
360         Page            page;
361         BlockNumber blkno;
362         OffsetNumber offno;
363         HashMetaPage metap;
364         HashPageOpaque opaque;
365
366         blkno = ItemPointerGetBlockNumber(tid);
367         offno = ItemPointerGetOffsetNumber(tid);
368
369         buf = _hash_getbuf(rel, blkno, HASH_WRITE);
370         page = BufferGetPage(buf);
371         _hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
372         opaque = (HashPageOpaque) PageGetSpecialPointer(page);
373
374         PageIndexTupleDelete(page, offno);
375         _hash_wrtnorelbuf(rel, buf);
376
377         if (PageIsEmpty(page) && (opaque->hasho_flag & LH_OVERFLOW_PAGE))
378         {
379                 buf = _hash_freeovflpage(rel, buf);
380                 if (BufferIsValid(buf))
381                         _hash_relbuf(rel, buf, HASH_WRITE);
382         }
383         else
384                 _hash_relbuf(rel, buf, HASH_WRITE);
385
386         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
387         metap = (HashMetaPage) BufferGetPage(metabuf);
388         _hash_checkpage((Page) metap, LH_META_PAGE);
389         ++metap->hashm_nkeys;
390         _hash_wrtbuf(rel, metabuf);
391 }
392
393 void
394 _hash_expandtable(Relation rel, Buffer metabuf)
395 {
396         HashMetaPage metap;
397         Bucket          old_bucket;
398         Bucket          new_bucket;
399         uint32          spare_ndx;
400
401 /*        elog(DEBUG, "_hash_expandtable: expanding..."); */
402
403         metap = (HashMetaPage) BufferGetPage(metabuf);
404         _hash_checkpage((Page) metap, LH_META_PAGE);
405
406         metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
407         new_bucket = ++metap->MAX_BUCKET;
408         metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
409         old_bucket = (metap->MAX_BUCKET & metap->LOW_MASK);
410
411         /*
412          * If the split point is increasing (MAX_BUCKET's log base 2 *
413          * increases), we need to copy the current contents of the spare split
414          * bucket to the next bucket.
415          */
416         spare_ndx = _hash_log2(metap->MAX_BUCKET + 1);
417         if (spare_ndx > metap->OVFL_POINT)
418         {
419
420                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
421                 metap->SPARES[spare_ndx] = metap->SPARES[metap->OVFL_POINT];
422                 metap->OVFL_POINT = spare_ndx;
423                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
424         }
425
426         if (new_bucket > metap->HIGH_MASK)
427         {
428
429                 /* Starting a new doubling */
430                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
431                 metap->LOW_MASK = metap->HIGH_MASK;
432                 metap->HIGH_MASK = new_bucket | metap->LOW_MASK;
433                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
434
435         }
436         /* Relocate records to the new bucket */
437         _hash_splitpage(rel, metabuf, old_bucket, new_bucket);
438 }
439
440
441 /*
442  * _hash_splitpage -- split 'obucket' into 'obucket' and 'nbucket'
443  *
444  * this routine is actually misnamed -- we are splitting a bucket that
445  * consists of a base bucket page and zero or more overflow (bucket
446  * chain) pages.
447  */
448 static void
449 _hash_splitpage(Relation rel,
450                                 Buffer metabuf,
451                                 Bucket obucket,
452                                 Bucket nbucket)
453 {
454         Bucket          bucket;
455         Buffer          obuf;
456         Buffer          nbuf;
457         Buffer          ovflbuf;
458         BlockNumber oblkno;
459         BlockNumber nblkno;
460         bool            null;
461         Datum           datum;
462         HashItem        hitem;
463         HashPageOpaque oopaque;
464         HashPageOpaque nopaque;
465         HashMetaPage metap;
466         IndexTuple      itup;
467         int                     itemsz;
468         OffsetNumber ooffnum;
469         OffsetNumber noffnum;
470         OffsetNumber omaxoffnum;
471         Page            opage;
472         Page            npage;
473         TupleDesc       itupdesc;
474
475 /*        elog(DEBUG, "_hash_splitpage: splitting %d into %d,%d",
476                  obucket, obucket, nbucket);
477 */
478         metap = (HashMetaPage) BufferGetPage(metabuf);
479         _hash_checkpage((Page) metap, LH_META_PAGE);
480
481         /* get the buffers & pages */
482         oblkno = BUCKET_TO_BLKNO(obucket);
483         nblkno = BUCKET_TO_BLKNO(nbucket);
484         obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
485         nbuf = _hash_getbuf(rel, nblkno, HASH_WRITE);
486         opage = BufferGetPage(obuf);
487         npage = BufferGetPage(nbuf);
488
489         /* initialize the new bucket */
490         _hash_pageinit(npage, BufferGetPageSize(nbuf));
491         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
492         nopaque->hasho_prevblkno = InvalidBlockNumber;
493         nopaque->hasho_nextblkno = InvalidBlockNumber;
494         nopaque->hasho_flag = LH_BUCKET_PAGE;
495         nopaque->hasho_oaddr = InvalidOvflAddress;
496         nopaque->hasho_bucket = nbucket;
497         _hash_wrtnorelbuf(rel, nbuf);
498
499         /*
500          * make sure the old bucket isn't empty.  advance 'opage' and friends
501          * through the overflow bucket chain until we find a non-empty page.
502          *
503          * XXX we should only need this once, if we are careful to preserve the
504          * invariant that overflow pages are never empty.
505          */
506         _hash_checkpage(opage, LH_BUCKET_PAGE);
507         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
508         if (PageIsEmpty(opage))
509         {
510                 oblkno = oopaque->hasho_nextblkno;
511                 _hash_relbuf(rel, obuf, HASH_WRITE);
512                 if (!BlockNumberIsValid(oblkno))
513                 {
514
515                         /*
516                          * the old bucket is completely empty; of course, the new
517                          * bucket will be as well, but since it's a base bucket page
518                          * we don't care.
519                          */
520                         _hash_relbuf(rel, nbuf, HASH_WRITE);
521                         return;
522                 }
523                 obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
524                 opage = BufferGetPage(obuf);
525                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
526                 if (PageIsEmpty(opage))
527                         elog(ERROR, "_hash_splitpage: empty overflow page %d", oblkno);
528                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
529         }
530
531         /*
532          * we are now guaranteed that 'opage' is not empty.  partition the
533          * tuples in the old bucket between the old bucket and the new bucket,
534          * advancing along their respective overflow bucket chains and adding
535          * overflow pages as needed.
536          */
537         ooffnum = FirstOffsetNumber;
538         omaxoffnum = PageGetMaxOffsetNumber(opage);
539         for (;;)
540         {
541
542                 /*
543                  * at each iteration through this loop, each of these variables
544                  * should be up-to-date: obuf opage oopaque ooffnum omaxoffnum
545                  */
546
547                 /* check if we're at the end of the page */
548                 if (ooffnum > omaxoffnum)
549                 {
550                         /* at end of page, but check for overflow page */
551                         oblkno = oopaque->hasho_nextblkno;
552                         if (BlockNumberIsValid(oblkno))
553                         {
554
555                                 /*
556                                  * we ran out of tuples on this particular page, but we
557                                  * have more overflow pages; re-init values.
558                                  */
559                                 _hash_wrtbuf(rel, obuf);
560                                 obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
561                                 opage = BufferGetPage(obuf);
562                                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
563                                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
564
565                                 /* we're guaranteed that an ovfl page has at least 1 tuple */
566                                 if (PageIsEmpty(opage))
567                                 {
568                                         elog(ERROR, "_hash_splitpage: empty ovfl page %d!",
569                                                  oblkno);
570                                 }
571                                 ooffnum = FirstOffsetNumber;
572                                 omaxoffnum = PageGetMaxOffsetNumber(opage);
573                         }
574                         else
575                         {
576
577                                 /*
578                                  * we're at the end of the bucket chain, so now we're
579                                  * really done with everything.  before quitting, call
580                                  * _hash_squeezebucket to ensure the tuples in the bucket
581                                  * (including the overflow pages) are packed as tightly as
582                                  * possible.
583                                  */
584                                 _hash_wrtbuf(rel, obuf);
585                                 _hash_wrtbuf(rel, nbuf);
586                                 _hash_squeezebucket(rel, metap, obucket);
587                                 return;
588                         }
589                 }
590
591                 /* hash on the tuple */
592                 hitem = (HashItem) PageGetItem(opage, PageGetItemId(opage, ooffnum));
593                 itup = &(hitem->hash_itup);
594                 itupdesc = RelationGetDescr(rel);
595                 datum = index_getattr(itup, 1, itupdesc, &null);
596                 bucket = _hash_call(rel, metap, datum);
597
598                 if (bucket == nbucket)
599                 {
600
601                         /*
602                          * insert the tuple into the new bucket.  if it doesn't fit on
603                          * the current page in the new bucket, we must allocate a new
604                          * overflow page and place the tuple on that page instead.
605                          */
606                         itemsz = IndexTupleDSize(hitem->hash_itup)
607                                 + (sizeof(HashItemData) - sizeof(IndexTupleData));
608
609                         itemsz = MAXALIGN(itemsz);
610
611                         if (PageGetFreeSpace(npage) < itemsz)
612                         {
613                                 ovflbuf = _hash_addovflpage(rel, &metabuf, nbuf);
614                                 _hash_wrtbuf(rel, nbuf);
615                                 nbuf = ovflbuf;
616                                 npage = BufferGetPage(nbuf);
617                                 _hash_checkpage(npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
618                         }
619
620                         noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
621                         PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED);
622                         _hash_wrtnorelbuf(rel, nbuf);
623
624                         /*
625                          * now delete the tuple from the old bucket.  after this
626                          * section of code, 'ooffnum' will actually point to the
627                          * ItemId to which we would point if we had advanced it before
628                          * the deletion (PageIndexTupleDelete repacks the ItemId
629                          * array).      this also means that 'omaxoffnum' is exactly one
630                          * less than it used to be, so we really can just decrement it
631                          * instead of calling PageGetMaxOffsetNumber.
632                          */
633                         PageIndexTupleDelete(opage, ooffnum);
634                         _hash_wrtnorelbuf(rel, obuf);
635                         omaxoffnum = OffsetNumberPrev(omaxoffnum);
636
637                         /*
638                          * tidy up.  if the old page was an overflow page and it is
639                          * now empty, we must free it (we want to preserve the
640                          * invariant that overflow pages cannot be empty).
641                          */
642                         if (PageIsEmpty(opage) &&
643                                 (oopaque->hasho_flag & LH_OVERFLOW_PAGE))
644                         {
645                                 obuf = _hash_freeovflpage(rel, obuf);
646
647                                 /* check that we're not through the bucket chain */
648                                 if (BufferIsInvalid(obuf))
649                                 {
650                                         _hash_wrtbuf(rel, nbuf);
651                                         _hash_squeezebucket(rel, metap, obucket);
652                                         return;
653                                 }
654
655                                 /*
656                                  * re-init. again, we're guaranteed that an ovfl page has
657                                  * at least one tuple.
658                                  */
659                                 opage = BufferGetPage(obuf);
660                                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
661                                 oblkno = BufferGetBlockNumber(obuf);
662                                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
663                                 if (PageIsEmpty(opage))
664                                 {
665                                         elog(ERROR, "_hash_splitpage: empty overflow page %d",
666                                                  oblkno);
667                                 }
668                                 ooffnum = FirstOffsetNumber;
669                                 omaxoffnum = PageGetMaxOffsetNumber(opage);
670                         }
671                 }
672                 else
673                 {
674
675                         /*
676                          * the tuple stays on this page.  we didn't move anything, so
677                          * we didn't delete anything and therefore we don't have to
678                          * change 'omaxoffnum'.
679                          *
680                          * XXX any hash value from [0, nbucket-1] will map to this
681                          * bucket, which doesn't make sense to me.
682                          */
683                         ooffnum = OffsetNumberNext(ooffnum);
684                 }
685         }
686         /* NOTREACHED */
687 }