]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hashpage.c
I've attached a patch which implements Bob Jenkin's hash function for
[postgresql] / src / backend / access / hash / hashpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * hashpage.c
4  *        Hash table page management code for the Postgres hash access method
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.35 2002/03/06 20:49:42 momjian Exp $
12  *
13  * NOTES
14  *        Postgres hash pages look like ordinary relation pages.  The opaque
15  *        data at high addresses includes information about the page including
16  *        whether a page is an overflow page or a true bucket, the block
17  *        numbers of the preceding and following pages, and the overflow
18  *        address of the page if it is an overflow page.
19  *
20  *        The first page in a hash relation, page zero, is special -- it stores
21  *        information describing the hash table; it is referred to as the
22  *        "meta page." Pages one and higher store the actual data.
23  *
24  *-------------------------------------------------------------------------
25  */
26
27 #include "postgres.h"
28
29 #include "access/genam.h"
30 #include "access/hash.h"
31 #include "miscadmin.h"
32 #include "storage/lmgr.h"
33
34
35 static void _hash_setpagelock(Relation rel, BlockNumber blkno, int access);
36 static void _hash_unsetpagelock(Relation rel, BlockNumber blkno, int access);
37 static void _hash_splitpage(Relation rel, Buffer metabuf, Bucket obucket, Bucket nbucket);
38
39 /*
40  *      We use high-concurrency locking on hash indices.  There are two cases in
41  *      which we don't do locking.  One is when we're building the index.
42  *      Since the creating transaction has not committed, no one can see
43  *      the index, and there's no reason to share locks.  The second case
44  *      is when we're just starting up the database system.  We use some
45  *      special-purpose initialization code in the relation cache manager
46  *      (see utils/cache/relcache.c) to allow us to do indexed scans on
47  *      the system catalogs before we'd normally be able to.  This happens
48  *      before the lock table is fully initialized, so we can't use it.
49  *      Strictly speaking, this violates 2pl, but we don't do 2pl on the
50  *      system catalogs anyway.
51  *
52  *      Note that our page locks are actual lockmanager locks, not buffer
53  *      locks (as are used by btree, for example).      This is a good idea because
54  *      the algorithms are not deadlock-free, and we'd better be able to detect
55  *      and recover from deadlocks.
56  *
57  *      Another important difference from btree is that a hash indexscan
58  *      retains both a lock and a buffer pin on the current index page
59  *      between hashgettuple() calls (btree keeps only a buffer pin).
60  *      Because of this, it's safe to do item deletions with only a regular
61  *      write lock on a hash page --- there cannot be an indexscan stopped on
62  *      the page being deleted, other than an indexscan of our own backend,
63  *      which will be taken care of by _hash_adjscans.
64  */
65
66
67 #define USELOCKING              (!BuildingHash && !IsInitProcessingMode())
68
69
70 /*
71  *      _hash_metapinit() -- Initialize the metadata page of a hash index,
72  *                              the two buckets that we begin with and the initial
73  *                              bitmap page.
74  */
75 void
76 _hash_metapinit(Relation rel)
77 {
78         HashMetaPage metap;
79         HashPageOpaque pageopaque;
80         Buffer          metabuf;
81         Buffer          buf;
82         Page            pg;
83         int                     nbuckets;
84         uint32          nelem;                  /* number elements */
85         uint32          lg2nelem;               /* _hash_log2(nelem)   */
86         uint16          i;
87
88         /* can't be sharing this with anyone, now... */
89         if (USELOCKING)
90                 LockRelation(rel, AccessExclusiveLock);
91
92         if (RelationGetNumberOfBlocks(rel) != 0)
93                 elog(ERROR, "Cannot initialize non-empty hash table %s",
94                          RelationGetRelationName(rel));
95
96         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
97         pg = BufferGetPage(metabuf);
98         metap = (HashMetaPage) pg;
99         _hash_pageinit(pg, BufferGetPageSize(metabuf));
100
101         metap->hashm_magic = HASH_MAGIC;
102         metap->hashm_version = HASH_VERSION;
103         metap->hashm_nkeys = 0;
104         metap->hashm_nmaps = 0;
105         metap->hashm_ffactor = DEFAULT_FFACTOR;
106         metap->hashm_bsize = BufferGetPageSize(metabuf);
107         metap->hashm_bshift = _hash_log2(metap->hashm_bsize);
108         for (i = metap->hashm_bshift; i > 0; --i)
109         {
110                 if ((1 << i) < (metap->hashm_bsize -
111                                                 (MAXALIGN(sizeof(PageHeaderData)) +
112                                                  MAXALIGN(sizeof(HashPageOpaqueData)))))
113                         break;
114         }
115         Assert(i);
116         metap->hashm_bmsize = 1 << i;
117         metap->hashm_procid = index_getprocid(rel, 1, HASHPROC);
118
119         /*
120          * Make nelem = 2 rather than 0 so that we end up allocating space for
121          * the next greater power of two number of buckets.
122          */
123         nelem = 2;
124         lg2nelem = 1;                           /* _hash_log2(MAX(nelem, 2)) */
125         nbuckets = 2;                           /* 1 << lg2nelem */
126
127         MemSet((char *) metap->hashm_spares, 0, sizeof(metap->hashm_spares));
128         MemSet((char *) metap->hashm_mapp, 0, sizeof(metap->hashm_mapp));
129
130         metap->hashm_spares[lg2nelem] = 2;      /* lg2nelem + 1 */
131         metap->hashm_spares[lg2nelem + 1] = 2;          /* lg2nelem + 1 */
132         metap->hashm_ovflpoint = 1; /* lg2nelem */
133         metap->hashm_lastfreed = 2;
134
135         metap->hashm_maxbucket = metap->hashm_lowmask = 1;      /* nbuckets - 1 */
136         metap->hashm_highmask = 3;      /* (nbuckets << 1) - 1 */
137
138         pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
139         pageopaque->hasho_oaddr = InvalidOvflAddress;
140         pageopaque->hasho_prevblkno = InvalidBlockNumber;
141         pageopaque->hasho_nextblkno = InvalidBlockNumber;
142         pageopaque->hasho_flag = LH_META_PAGE;
143         pageopaque->hasho_bucket = -1;
144
145         /*
146          * First bitmap page is at: splitpoint lg2nelem page offset 1 which
147          * turns out to be page 3. Couldn't initialize page 3  until we
148          * created the first two buckets above.
149          */
150         if (_hash_initbitmap(rel, metap, OADDR_OF(lg2nelem, 1), lg2nelem + 1, 0))
151                 elog(ERROR, "Problem with _hash_initbitmap.");
152
153         /* all done */
154         _hash_wrtnorelbuf(metabuf);
155
156         /*
157          * initialize the first two buckets
158          */
159         for (i = 0; i <= 1; i++)
160         {
161                 buf = _hash_getbuf(rel, BUCKET_TO_BLKNO(i), HASH_WRITE);
162                 pg = BufferGetPage(buf);
163                 _hash_pageinit(pg, BufferGetPageSize(buf));
164                 pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
165                 pageopaque->hasho_oaddr = InvalidOvflAddress;
166                 pageopaque->hasho_prevblkno = InvalidBlockNumber;
167                 pageopaque->hasho_nextblkno = InvalidBlockNumber;
168                 pageopaque->hasho_flag = LH_BUCKET_PAGE;
169                 pageopaque->hasho_bucket = i;
170                 _hash_wrtbuf(rel, buf);
171         }
172
173         _hash_relbuf(rel, metabuf, HASH_WRITE);
174
175         if (USELOCKING)
176                 UnlockRelation(rel, AccessExclusiveLock);
177 }
178
179 /*
180  *      _hash_getbuf() -- Get a buffer by block number for read or write.
181  *
182  *              When this routine returns, the appropriate lock is set on the
183  *              requested buffer its reference count is correct.
184  *
185  *              XXX P_NEW is not used because, unlike the tree structures, we
186  *              need the bucket blocks to be at certain block numbers.  we must
187  *              depend on the caller to call _hash_pageinit on the block if it
188  *              knows that this is a new block.
189  */
190 Buffer
191 _hash_getbuf(Relation rel, BlockNumber blkno, int access)
192 {
193         Buffer          buf;
194
195         if (blkno == P_NEW)
196                 elog(ERROR, "_hash_getbuf: internal error: hash AM does not use P_NEW");
197         switch (access)
198         {
199                 case HASH_WRITE:
200                 case HASH_READ:
201                         _hash_setpagelock(rel, blkno, access);
202                         break;
203                 default:
204                         elog(ERROR, "_hash_getbuf: invalid access (%d) on new blk: %s",
205                                  access, RelationGetRelationName(rel));
206                         break;
207         }
208         buf = ReadBuffer(rel, blkno);
209
210         /* ref count and lock type are correct */
211         return buf;
212 }
213
214 /*
215  *      _hash_relbuf() -- release a locked buffer.
216  */
217 void
218 _hash_relbuf(Relation rel, Buffer buf, int access)
219 {
220         BlockNumber blkno;
221
222         blkno = BufferGetBlockNumber(buf);
223
224         switch (access)
225         {
226                 case HASH_WRITE:
227                 case HASH_READ:
228                         _hash_unsetpagelock(rel, blkno, access);
229                         break;
230                 default:
231                         elog(ERROR, "_hash_relbuf: invalid access (%d) on blk %x: %s",
232                                  access, blkno, RelationGetRelationName(rel));
233         }
234
235         ReleaseBuffer(buf);
236 }
237
238 /*
239  *      _hash_wrtbuf() -- write a hash page to disk.
240  *
241  *              This routine releases the lock held on the buffer and our reference
242  *              to it.  It is an error to call _hash_wrtbuf() without a write lock
243  *              or a reference to the buffer.
244  */
245 void
246 _hash_wrtbuf(Relation rel, Buffer buf)
247 {
248         BlockNumber blkno;
249
250         blkno = BufferGetBlockNumber(buf);
251         WriteBuffer(buf);
252         _hash_unsetpagelock(rel, blkno, HASH_WRITE);
253 }
254
255 /*
256  *      _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
257  *                                               our reference or lock.
258  *
259  *              It is an error to call _hash_wrtnorelbuf() without a write lock
260  *              or a reference to the buffer.
261  */
262 void
263 _hash_wrtnorelbuf(Buffer buf)
264 {
265         BlockNumber blkno;
266
267         blkno = BufferGetBlockNumber(buf);
268         WriteNoReleaseBuffer(buf);
269 }
270
271 Page
272 _hash_chgbufaccess(Relation rel,
273                                    Buffer *bufp,
274                                    int from_access,
275                                    int to_access)
276 {
277         BlockNumber blkno;
278
279         blkno = BufferGetBlockNumber(*bufp);
280
281         switch (from_access)
282         {
283                 case HASH_WRITE:
284                         _hash_wrtbuf(rel, *bufp);
285                         break;
286                 case HASH_READ:
287                         _hash_relbuf(rel, *bufp, from_access);
288                         break;
289                 default:
290                         elog(ERROR, "_hash_chgbufaccess: invalid access (%d) on blk %x: %s",
291                                  from_access, blkno, RelationGetRelationName(rel));
292                         break;
293         }
294         *bufp = _hash_getbuf(rel, blkno, to_access);
295         return BufferGetPage(*bufp);
296 }
297
298 /*
299  *      _hash_pageinit() -- Initialize a new page.
300  */
301 void
302 _hash_pageinit(Page page, Size size)
303 {
304         Assert(PageIsNew(page));
305         PageInit(page, size, sizeof(HashPageOpaqueData));
306 }
307
308 static void
309 _hash_setpagelock(Relation rel,
310                                   BlockNumber blkno,
311                                   int access)
312 {
313
314         if (USELOCKING)
315         {
316                 switch (access)
317                 {
318                         case HASH_WRITE:
319                                 LockPage(rel, blkno, ExclusiveLock);
320                                 break;
321                         case HASH_READ:
322                                 LockPage(rel, blkno, ShareLock);
323                                 break;
324                         default:
325                                 elog(ERROR, "_hash_setpagelock: invalid access (%d) on blk %x: %s",
326                                          access, blkno, RelationGetRelationName(rel));
327                                 break;
328                 }
329         }
330 }
331
332 static void
333 _hash_unsetpagelock(Relation rel,
334                                         BlockNumber blkno,
335                                         int access)
336 {
337
338         if (USELOCKING)
339         {
340                 switch (access)
341                 {
342                         case HASH_WRITE:
343                                 UnlockPage(rel, blkno, ExclusiveLock);
344                                 break;
345                         case HASH_READ:
346                                 UnlockPage(rel, blkno, ShareLock);
347                                 break;
348                         default:
349                                 elog(ERROR, "_hash_unsetpagelock: invalid access (%d) on blk %x: %s",
350                                          access, blkno, RelationGetRelationName(rel));
351                                 break;
352                 }
353         }
354 }
355
356 /*
357  * Delete a hash index item.
358  *
359  * It is safe to delete an item after acquiring a regular WRITE lock on
360  * the page, because no other backend can hold a READ lock on the page,
361  * and that means no other backend currently has an indexscan stopped on
362  * any item of the item being deleted.  Our own backend might have such
363  * an indexscan (in fact *will*, since that's how VACUUM found the item
364  * in the first place), but _hash_adjscans will fix the scan position.
365  */
366 void
367 _hash_pagedel(Relation rel, ItemPointer tid)
368 {
369         Buffer          buf;
370         Buffer          metabuf;
371         Page            page;
372         BlockNumber blkno;
373         OffsetNumber offno;
374         HashMetaPage metap;
375         HashPageOpaque opaque;
376
377         blkno = ItemPointerGetBlockNumber(tid);
378         offno = ItemPointerGetOffsetNumber(tid);
379
380         buf = _hash_getbuf(rel, blkno, HASH_WRITE);
381         page = BufferGetPage(buf);
382         _hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
383         opaque = (HashPageOpaque) PageGetSpecialPointer(page);
384
385         PageIndexTupleDelete(page, offno);
386         _hash_wrtnorelbuf(buf);
387
388         if (PageIsEmpty(page) && (opaque->hasho_flag & LH_OVERFLOW_PAGE))
389         {
390                 buf = _hash_freeovflpage(rel, buf);
391                 if (BufferIsValid(buf))
392                         _hash_relbuf(rel, buf, HASH_WRITE);
393         }
394         else
395                 _hash_relbuf(rel, buf, HASH_WRITE);
396
397         metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
398         metap = (HashMetaPage) BufferGetPage(metabuf);
399         _hash_checkpage((Page) metap, LH_META_PAGE);
400         metap->hashm_nkeys--;
401         _hash_wrtbuf(rel, metabuf);
402 }
403
404 void
405 _hash_expandtable(Relation rel, Buffer metabuf)
406 {
407         HashMetaPage metap;
408         Bucket          old_bucket;
409         Bucket          new_bucket;
410         uint32          spare_ndx;
411
412 /*        elog(DEBUG, "_hash_expandtable: expanding..."); */
413
414         metap = (HashMetaPage) BufferGetPage(metabuf);
415         _hash_checkpage((Page) metap, LH_META_PAGE);
416
417         metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
418         new_bucket = ++metap->hashm_maxbucket;
419         metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
420         old_bucket = (metap->hashm_maxbucket & metap->hashm_lowmask);
421
422         /*
423          * If the split point is increasing (hashm_maxbucket's log base 2 *
424          * increases), we need to copy the current contents of the spare split
425          * bucket to the next bucket.
426          */
427         spare_ndx = _hash_log2(metap->hashm_maxbucket + 1);
428         if (spare_ndx > metap->hashm_ovflpoint)
429         {
430
431                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
432                 metap->hashm_spares[spare_ndx] = metap->hashm_spares[metap->hashm_ovflpoint];
433                 metap->hashm_ovflpoint = spare_ndx;
434                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
435         }
436
437         if (new_bucket > metap->hashm_highmask)
438         {
439
440                 /* Starting a new doubling */
441                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
442                 metap->hashm_lowmask = metap->hashm_highmask;
443                 metap->hashm_highmask = new_bucket | metap->hashm_lowmask;
444                 metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
445
446         }
447         /* Relocate records to the new bucket */
448         _hash_splitpage(rel, metabuf, old_bucket, new_bucket);
449 }
450
451
452 /*
453  * _hash_splitpage -- split 'obucket' into 'obucket' and 'nbucket'
454  *
455  * this routine is actually misnamed -- we are splitting a bucket that
456  * consists of a base bucket page and zero or more overflow (bucket
457  * chain) pages.
458  */
459 static void
460 _hash_splitpage(Relation rel,
461                                 Buffer metabuf,
462                                 Bucket obucket,
463                                 Bucket nbucket)
464 {
465         Bucket          bucket;
466         Buffer          obuf;
467         Buffer          nbuf;
468         Buffer          ovflbuf;
469         BlockNumber oblkno;
470         BlockNumber nblkno;
471         bool            null;
472         Datum           datum;
473         HashItem        hitem;
474         HashPageOpaque oopaque;
475         HashPageOpaque nopaque;
476         HashMetaPage metap;
477         IndexTuple      itup;
478         Size            itemsz;
479         OffsetNumber ooffnum;
480         OffsetNumber noffnum;
481         OffsetNumber omaxoffnum;
482         Page            opage;
483         Page            npage;
484         TupleDesc       itupdesc;
485
486 /*        elog(DEBUG, "_hash_splitpage: splitting %d into %d,%d",
487                  obucket, obucket, nbucket);
488 */
489         metap = (HashMetaPage) BufferGetPage(metabuf);
490         _hash_checkpage((Page) metap, LH_META_PAGE);
491
492         /* get the buffers & pages */
493         oblkno = BUCKET_TO_BLKNO(obucket);
494         nblkno = BUCKET_TO_BLKNO(nbucket);
495         obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
496         nbuf = _hash_getbuf(rel, nblkno, HASH_WRITE);
497         opage = BufferGetPage(obuf);
498         npage = BufferGetPage(nbuf);
499
500         /* initialize the new bucket */
501         _hash_pageinit(npage, BufferGetPageSize(nbuf));
502         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
503         nopaque->hasho_prevblkno = InvalidBlockNumber;
504         nopaque->hasho_nextblkno = InvalidBlockNumber;
505         nopaque->hasho_flag = LH_BUCKET_PAGE;
506         nopaque->hasho_oaddr = InvalidOvflAddress;
507         nopaque->hasho_bucket = nbucket;
508         _hash_wrtnorelbuf(nbuf);
509
510         /*
511          * make sure the old bucket isn't empty.  advance 'opage' and friends
512          * through the overflow bucket chain until we find a non-empty page.
513          *
514          * XXX we should only need this once, if we are careful to preserve the
515          * invariant that overflow pages are never empty.
516          */
517         _hash_checkpage(opage, LH_BUCKET_PAGE);
518         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
519         if (PageIsEmpty(opage))
520         {
521                 oblkno = oopaque->hasho_nextblkno;
522                 _hash_relbuf(rel, obuf, HASH_WRITE);
523                 if (!BlockNumberIsValid(oblkno))
524                 {
525                         /*
526                          * the old bucket is completely empty; of course, the new
527                          * bucket will be as well, but since it's a base bucket page
528                          * we don't care.
529                          */
530                         _hash_relbuf(rel, nbuf, HASH_WRITE);
531                         return;
532                 }
533                 obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
534                 opage = BufferGetPage(obuf);
535                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
536                 if (PageIsEmpty(opage))
537                         elog(ERROR, "_hash_splitpage: empty overflow page %d", oblkno);
538                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
539         }
540
541         /*
542          * we are now guaranteed that 'opage' is not empty.  partition the
543          * tuples in the old bucket between the old bucket and the new bucket,
544          * advancing along their respective overflow bucket chains and adding
545          * overflow pages as needed.
546          */
547         ooffnum = FirstOffsetNumber;
548         omaxoffnum = PageGetMaxOffsetNumber(opage);
549         for (;;)
550         {
551                 /*
552                  * at each iteration through this loop, each of these variables
553                  * should be up-to-date: obuf opage oopaque ooffnum omaxoffnum
554                  */
555
556                 /* check if we're at the end of the page */
557                 if (ooffnum > omaxoffnum)
558                 {
559                         /* at end of page, but check for overflow page */
560                         oblkno = oopaque->hasho_nextblkno;
561                         if (BlockNumberIsValid(oblkno))
562                         {
563                                 /*
564                                  * we ran out of tuples on this particular page, but we
565                                  * have more overflow pages; re-init values.
566                                  */
567                                 _hash_wrtbuf(rel, obuf);
568                                 obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
569                                 opage = BufferGetPage(obuf);
570                                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
571                                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
572
573                                 /* we're guaranteed that an ovfl page has at least 1 tuple */
574                                 if (PageIsEmpty(opage))
575                                 {
576                                         elog(ERROR, "_hash_splitpage: empty ovfl page %d!",
577                                                  oblkno);
578                                 }
579                                 ooffnum = FirstOffsetNumber;
580                                 omaxoffnum = PageGetMaxOffsetNumber(opage);
581                         }
582                         else
583                         {
584                                 /*
585                                  * we're at the end of the bucket chain, so now we're
586                                  * really done with everything.  before quitting, call
587                                  * _hash_squeezebucket to ensure the tuples in the bucket
588                                  * (including the overflow pages) are packed as tightly as
589                                  * possible.
590                                  */
591                                 _hash_wrtbuf(rel, obuf);
592                                 _hash_wrtbuf(rel, nbuf);
593                                 _hash_squeezebucket(rel, metap, obucket);
594                                 return;
595                         }
596                 }
597
598                 /* hash on the tuple */
599                 hitem = (HashItem) PageGetItem(opage, PageGetItemId(opage, ooffnum));
600                 itup = &(hitem->hash_itup);
601                 itupdesc = RelationGetDescr(rel);
602                 datum = index_getattr(itup, 1, itupdesc, &null);
603                 bucket = _hash_call(rel, metap, datum);
604
605                 if (bucket == nbucket)
606                 {
607                         /*
608                          * insert the tuple into the new bucket.  if it doesn't fit on
609                          * the current page in the new bucket, we must allocate a new
610                          * overflow page and place the tuple on that page instead.
611                          */
612                         itemsz = IndexTupleDSize(hitem->hash_itup)
613                                 + (sizeof(HashItemData) - sizeof(IndexTupleData));
614
615                         itemsz = MAXALIGN(itemsz);
616
617                         if (PageGetFreeSpace(npage) < itemsz)
618                         {
619                                 ovflbuf = _hash_addovflpage(rel, &metabuf, nbuf);
620                                 _hash_wrtbuf(rel, nbuf);
621                                 nbuf = ovflbuf;
622                                 npage = BufferGetPage(nbuf);
623                                 _hash_checkpage(npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
624                         }
625
626                         noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
627                         if (PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED)
628                                 == InvalidOffsetNumber)
629                                 elog(ERROR, "_hash_splitpage: failed to add index item to %s",
630                                          RelationGetRelationName(rel));
631                         _hash_wrtnorelbuf(nbuf);
632
633                         /*
634                          * now delete the tuple from the old bucket.  after this
635                          * section of code, 'ooffnum' will actually point to the
636                          * ItemId to which we would point if we had advanced it before
637                          * the deletion (PageIndexTupleDelete repacks the ItemId
638                          * array).      this also means that 'omaxoffnum' is exactly one
639                          * less than it used to be, so we really can just decrement it
640                          * instead of calling PageGetMaxOffsetNumber.
641                          */
642                         PageIndexTupleDelete(opage, ooffnum);
643                         _hash_wrtnorelbuf(obuf);
644                         omaxoffnum = OffsetNumberPrev(omaxoffnum);
645
646                         /*
647                          * tidy up.  if the old page was an overflow page and it is
648                          * now empty, we must free it (we want to preserve the
649                          * invariant that overflow pages cannot be empty).
650                          */
651                         if (PageIsEmpty(opage) &&
652                                 (oopaque->hasho_flag & LH_OVERFLOW_PAGE))
653                         {
654                                 obuf = _hash_freeovflpage(rel, obuf);
655
656                                 /* check that we're not through the bucket chain */
657                                 if (BufferIsInvalid(obuf))
658                                 {
659                                         _hash_wrtbuf(rel, nbuf);
660                                         _hash_squeezebucket(rel, metap, obucket);
661                                         return;
662                                 }
663
664                                 /*
665                                  * re-init. again, we're guaranteed that an ovfl page has
666                                  * at least one tuple.
667                                  */
668                                 opage = BufferGetPage(obuf);
669                                 _hash_checkpage(opage, LH_OVERFLOW_PAGE);
670                                 oblkno = BufferGetBlockNumber(obuf);
671                                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
672                                 if (PageIsEmpty(opage))
673                                 {
674                                         elog(ERROR, "_hash_splitpage: empty overflow page %d",
675                                                  oblkno);
676                                 }
677                                 ooffnum = FirstOffsetNumber;
678                                 omaxoffnum = PageGetMaxOffsetNumber(opage);
679                         }
680                 }
681                 else
682                 {
683                         /*
684                          * the tuple stays on this page.  we didn't move anything, so
685                          * we didn't delete anything and therefore we don't have to
686                          * change 'omaxoffnum'.
687                          *
688                          * XXX any hash value from [0, nbucket-1] will map to this
689                          * bucket, which doesn't make sense to me.
690                          */
691                         ooffnum = OffsetNumberNext(ooffnum);
692                 }
693         }
694         /* NOTREACHED */
695 }