]> granicus.if.org Git - postgresql/commitdiff
Fix two serious bugs introduced into hash indexes by the 8.4 patch that made
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 1 Nov 2009 21:25:25 +0000 (21:25 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 1 Nov 2009 21:25:25 +0000 (21:25 +0000)
hash indexes keep entries sorted by hash value.  First, the original plans for
concurrency assumed that insertions would happen only at the end of a page,
which is no longer true; this could cause scans to transiently fail to find
index entries in the presence of concurrent insertions.  We can compensate
by teaching scans to re-find their position after re-acquiring read locks.
Second, neither the bucket split nor the bucket compaction logic had been
fixed to preserve hashvalue ordering, so application of either of those
processes could lead to permanent corruption of an index, in the sense
that searches might fail to find entries that are present.

This patch fixes the split and compaction logic to preserve hashvalue
ordering, but it cannot do anything about pre-existing corruption.  We will
need to recommend reindexing all hash indexes in the 8.4.2 release notes.

To buy back the performance loss hereby induced in split and compaction,
fix them to use PageIndexMultiDelete instead of retail PageIndexDelete
operations.  We might later want to do something with qsort'ing the
page contents rather than doing a binary search for each insertion,
but that seemed more invasive than I cared to risk in a back-patch.

Per bug #5157 from Jeff Janes and subsequent investigation.

src/backend/access/hash/README
src/backend/access/hash/hash.c
src/backend/access/hash/hashinsert.c
src/backend/access/hash/hashovfl.c
src/backend/access/hash/hashpage.c
src/include/access/hash.h

index ca23920648038ff40c30df1f74ab74c9c6d03542..026ad40bfbae1cebff1f7b0168e5c189eaf0c4f3 100644 (file)
@@ -1,14 +1,15 @@
-$PostgreSQL: pgsql/src/backend/access/hash/README,v 1.8 2008/03/20 17:55:14 momjian Exp $
+$PostgreSQL: pgsql/src/backend/access/hash/README,v 1.9 2009/11/01 21:25:25 tgl Exp $
 
 Hash Indexing
 =============
 
-This directory contains an implementation of hash indexing for Postgres.  Most
-of the core ideas are taken from Margo Seltzer and Ozan Yigit, A New Hashing
-Package for UNIX, Proceedings of the Winter USENIX Conference, January 1991.
-(Our in-memory hashtable implementation, src/backend/utils/hash/dynahash.c,
-also relies on some of the same concepts; it is derived from code written by
-Esmond Pitt and later improved by Margo among others.)
+This directory contains an implementation of hash indexing for Postgres.
+Most of the core ideas are taken from Margo Seltzer and Ozan Yigit,
+A New Hashing Package for UNIX, Proceedings of the Winter USENIX Conference,
+January 1991.  (Our in-memory hashtable implementation,
+src/backend/utils/hash/dynahash.c, also relies on some of the same concepts;
+it is derived from code written by Esmond Pitt and later improved by Margo
+among others.)
 
 A hash index consists of two or more "buckets", into which tuples are
 placed whenever their hash key maps to the bucket number.  The
@@ -32,6 +33,14 @@ rebuilding it with REINDEX.  Overflow pages can be recycled for reuse
 in other buckets, but we never give them back to the operating system.
 There is no provision for reducing the number of buckets, either.
 
+As of PostgreSQL 8.4, hash index entries store only the hash code, not the
+actual data value, for each indexed item.  This makes the index entries
+smaller (perhaps very substantially so) and speeds up various operations.
+In particular, we can speed searches by keeping the index entries in any
+one index page sorted by hash code, thus allowing binary search to be used
+within an index page.  Note however that there is *no* assumption about the
+relative ordering of hash codes across different index pages of a bucket.
+
 
 Page Addressing
 ---------------
@@ -205,8 +214,18 @@ the reader ensures that the target bucket calculation is valid (otherwise
 the bucket might be split before the reader arrives at it, and the target
 entries might go into the new bucket).  Holding the bucket sharelock for
 the remainder of the scan prevents the reader's current-tuple pointer from
-being invalidated by other processes.  Notice though that the reader need
-not prevent other buckets from being split or compacted.
+being invalidated by splits or compactions.  Notice that the reader's lock
+does not prevent other buckets from being split or compacted.
+
+To keep concurrency reasonably good, we require readers to cope with
+concurrent insertions, which means that they have to be able to re-find
+their current scan position after re-acquiring the page sharelock.  Since
+deletion is not possible while a reader holds the bucket sharelock, and
+we assume that heap tuple TIDs are unique, this can be implemented by
+searching for the same heap tuple TID previously returned.  Insertion does
+not move index entries across pages, so the previously-returned index entry
+should always be on the same page, at the same or higher offset number,
+as it was before.
 
 The insertion algorithm is rather similar:
 
@@ -220,7 +239,7 @@ The insertion algorithm is rather similar:
        read/exclusive-lock current page of bucket
        if full, release, read/exclusive-lock next page; repeat as needed
        >> see below if no space in any page of bucket
-       insert tuple
+       insert tuple at appropriate place in page
        write/release current page
        release bucket share-lock
        read/exclusive-lock meta page
@@ -228,11 +247,12 @@ The insertion algorithm is rather similar:
        write/release meta page
        done if no split needed, else enter Split algorithm below
 
-It is okay for an insertion to take place in a bucket that is being
-actively scanned, because it does not change the position of any existing
-item in the bucket, so scan states are not invalidated.  We only need the
-short-term buffer locks to ensure that readers do not see a
-partially-updated page.
+To speed searches, the index entries within any individual index page are
+kept sorted by hash code; the insertion code must take care to insert new
+entries in the right place.  It is okay for an insertion to take place in a
+bucket that is being actively scanned, because readers can cope with this
+as explained above.  We only need the short-term buffer locks to ensure
+that readers do not see a partially-updated page.
 
 It is clearly impossible for readers and inserters to deadlock, and in
 fact this algorithm allows them a very high degree of concurrency.
index 3505683836457e9ab470b454a816e977bd2cefdd..25e601518a0cdc1a4caf746f0774f1b93403f3db 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/hash/hash.c,v 1.113 2009/07/29 20:56:18 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/hash/hash.c,v 1.114 2009/11/01 21:25:25 tgl Exp $
  *
  * NOTES
  *       This file contains only the public interface routines.
@@ -206,8 +206,10 @@ hashgettuple(PG_FUNCTION_ARGS)
        ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
        Relation        rel = scan->indexRelation;
+       Buffer          buf;
        Page            page;
        OffsetNumber offnum;
+       ItemPointer current;
        bool            res;
 
        /* Hash indexes are always lossy since we store only the hash code */
@@ -225,8 +227,38 @@ hashgettuple(PG_FUNCTION_ARGS)
         * appropriate direction.  If we haven't done so yet, we call a routine to
         * get the first item in the scan.
         */
-       if (ItemPointerIsValid(&(so->hashso_curpos)))
+       current = &(so->hashso_curpos);
+       if (ItemPointerIsValid(current))
        {
+               /*
+                * An insertion into the current index page could have happened while
+                * we didn't have read lock on it.  Re-find our position by looking
+                * for the TID we previously returned.  (Because we hold share lock on
+                * the bucket, no deletions or splits could have occurred; therefore
+                * we can expect that the TID still exists in the current index page,
+                * at an offset >= where we were.)
+                */
+               OffsetNumber maxoffnum;
+
+               buf = so->hashso_curbuf;
+               Assert(BufferIsValid(buf));
+               page = BufferGetPage(buf);
+               maxoffnum = PageGetMaxOffsetNumber(page);
+               for (offnum = ItemPointerGetOffsetNumber(current);
+                        offnum <= maxoffnum;
+                        offnum = OffsetNumberNext(offnum))
+               {
+                       IndexTuple      itup;
+
+                       itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
+                       if (ItemPointerEquals(&scan->xs_ctup.t_self, &itup->t_tid))
+                               break;
+               }
+               if (offnum > maxoffnum)
+                       elog(ERROR, "failed to re-find scan position within index \"%s\"",
+                                RelationGetRelationName(rel));
+               ItemPointerSetOffsetNumber(current, offnum);
+
                /*
                 * Check to see if we should kill the previously-fetched tuple.
                 */
@@ -235,8 +267,6 @@ hashgettuple(PG_FUNCTION_ARGS)
                        /*
                         * Yes, so mark it by setting the LP_DEAD state in the item flags.
                         */
-                       offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
-                       page = BufferGetPage(so->hashso_curbuf);
                        ItemIdMarkDead(PageGetItemId(page, offnum));
 
                        /*
@@ -244,7 +274,7 @@ hashgettuple(PG_FUNCTION_ARGS)
                         * as a commit-hint-bit status update for heap tuples: we mark the
                         * buffer dirty but don't make a WAL log entry.
                         */
-                       SetBufferCommitInfoNeedsSave(so->hashso_curbuf);
+                       SetBufferCommitInfoNeedsSave(buf);
                }
 
                /*
@@ -262,7 +292,7 @@ hashgettuple(PG_FUNCTION_ARGS)
        {
                while (res)
                {
-                       offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
+                       offnum = ItemPointerGetOffsetNumber(current);
                        page = BufferGetPage(so->hashso_curbuf);
                        if (!ItemIdIsDead(PageGetItemId(page, offnum)))
                                break;
@@ -517,7 +547,8 @@ loop_top:
                        HashPageOpaque opaque;
                        OffsetNumber offno;
                        OffsetNumber maxoffno;
-                       bool            page_dirty = false;
+                       OffsetNumber deletable[MaxOffsetNumber];
+                       int                     ndeletable = 0;
 
                        vacuum_delay_point();
 
@@ -529,9 +560,10 @@ loop_top:
                        Assert(opaque->hasho_bucket == cur_bucket);
 
                        /* Scan each tuple in page */
-                       offno = FirstOffsetNumber;
                        maxoffno = PageGetMaxOffsetNumber(page);
-                       while (offno <= maxoffno)
+                       for (offno = FirstOffsetNumber;
+                                offno <= maxoffno;
+                                offno = OffsetNumberNext(offno))
                        {
                                IndexTuple      itup;
                                ItemPointer htup;
@@ -541,30 +573,25 @@ loop_top:
                                htup = &(itup->t_tid);
                                if (callback(htup, callback_state))
                                {
-                                       /* delete the item from the page */
-                                       PageIndexTupleDelete(page, offno);
-                                       bucket_dirty = page_dirty = true;
-
-                                       /* don't increment offno, instead decrement maxoffno */
-                                       maxoffno = OffsetNumberPrev(maxoffno);
-
+                                       /* mark the item for deletion */
+                                       deletable[ndeletable++] = offno;
                                        tuples_removed += 1;
                                }
                                else
-                               {
-                                       offno = OffsetNumberNext(offno);
-
                                        num_index_tuples += 1;
-                               }
                        }
 
                        /*
-                        * Write page if needed, advance to next page.
+                        * Apply deletions and write page if needed, advance to next page.
                         */
                        blkno = opaque->hasho_nextblkno;
 
-                       if (page_dirty)
+                       if (ndeletable > 0)
+                       {
+                               PageIndexMultiDelete(page, deletable, ndeletable);
                                _hash_wrtbuf(rel, buf);
+                               bucket_dirty = true;
+                       }
                        else
                                _hash_relbuf(rel, buf);
                }
index a03087b76b74adce02992d471bf84d881ede5261..4cc419759d4522266671ec4a790ea6f45f702b76 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/hash/hashinsert.c,v 1.52 2009/01/01 17:23:35 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/hash/hashinsert.c,v 1.53 2009/11/01 21:25:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "utils/rel.h"
 
 
-static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
-                          Size itemsize, IndexTuple itup);
-
-
 /*
  *     _hash_doinsert() -- Handle insertion of a single index tuple.
  *
@@ -180,15 +176,16 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 /*
  *     _hash_pgaddtup() -- add a tuple to a particular page in the index.
  *
- *             This routine adds the tuple to the page as requested; it does
- *             not write out the page.  It is an error to call pgaddtup() without
- *             a write lock and pin.
+ * This routine adds the tuple to the page as requested; it does not write out
+ * the page.  It is an error to call pgaddtup() without pin and write lock on
+ * the target buffer.
+ *
+ * Returns the offset number at which the tuple was inserted.  This function
+ * is responsible for preserving the condition that tuples in a hash index
+ * page are sorted by hashkey value.
  */
-static OffsetNumber
-_hash_pgaddtup(Relation rel,
-                          Buffer buf,
-                          Size itemsize,
-                          IndexTuple itup)
+OffsetNumber
+_hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
 {
        OffsetNumber itup_off;
        Page            page;
index 3ab5e1edeb6253590a2d4011dfd92455518087fe..7b9b585eefa0e0fe89e6c337a653f073a1860766 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/hash/hashovfl.c,v 1.66 2009/01/01 17:23:35 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/hash/hashovfl.c,v 1.67 2009/11/01 21:25:25 tgl Exp $
  *
  * NOTES
  *       Overflow pages look like ordinary relation pages.
@@ -582,18 +582,15 @@ _hash_squeezebucket(Relation rel,
                                        BlockNumber bucket_blkno,
                                        BufferAccessStrategy bstrategy)
 {
-       Buffer          wbuf;
-       Buffer          rbuf = 0;
        BlockNumber wblkno;
        BlockNumber rblkno;
+       Buffer          wbuf;
+       Buffer          rbuf;
        Page            wpage;
        Page            rpage;
        HashPageOpaque wopaque;
        HashPageOpaque ropaque;
-       OffsetNumber woffnum;
-       OffsetNumber roffnum;
-       IndexTuple      itup;
-       Size            itemsz;
+       bool            wbuf_dirty;
 
        /*
         * start squeezing into the base bucket page.
@@ -622,11 +619,12 @@ _hash_squeezebucket(Relation rel,
         * usually smaller than the buffer ring being used by VACUUM, else using
         * the access strategy here would be counterproductive.
         */
+       rbuf = InvalidBuffer;
        ropaque = wopaque;
        do
        {
                rblkno = ropaque->hasho_nextblkno;
-               if (ropaque != wopaque)
+               if (rbuf != InvalidBuffer)
                        _hash_relbuf(rel, rbuf);
                rbuf = _hash_getbuf_with_strategy(rel,
                                                                                  rblkno,
@@ -641,12 +639,23 @@ _hash_squeezebucket(Relation rel,
        /*
         * squeeze the tuples.
         */
-       roffnum = FirstOffsetNumber;
+       wbuf_dirty = false;
        for (;;)
        {
-               /* this test is needed in case page is empty on entry */
-               if (roffnum <= PageGetMaxOffsetNumber(rpage))
+               OffsetNumber roffnum;
+               OffsetNumber maxroffnum;
+               OffsetNumber deletable[MaxOffsetNumber];
+               int                     ndeletable = 0;
+
+               /* Scan each tuple in "read" page */
+               maxroffnum = PageGetMaxOffsetNumber(rpage);
+               for (roffnum = FirstOffsetNumber;
+                        roffnum <= maxroffnum;
+                        roffnum = OffsetNumberNext(roffnum))
                {
+                       IndexTuple      itup;
+                       Size            itemsz;
+
                        itup = (IndexTuple) PageGetItem(rpage,
                                                                                        PageGetItemId(rpage, roffnum));
                        itemsz = IndexTupleDSize(*itup);
@@ -663,12 +672,22 @@ _hash_squeezebucket(Relation rel,
                                wblkno = wopaque->hasho_nextblkno;
                                Assert(BlockNumberIsValid(wblkno));
 
-                               _hash_wrtbuf(rel, wbuf);
+                               if (wbuf_dirty)
+                                       _hash_wrtbuf(rel, wbuf);
+                               else
+                                       _hash_relbuf(rel, wbuf);
 
+                               /* nothing more to do if we reached the read page */
                                if (rblkno == wblkno)
                                {
-                                       /* wbuf is already released */
-                                       _hash_wrtbuf(rel, rbuf);
+                                       if (ndeletable > 0)
+                                       {
+                                               /* Delete tuples we already moved off read page */
+                                               PageIndexMultiDelete(rpage, deletable, ndeletable);
+                                               _hash_wrtbuf(rel, rbuf);
+                                       }
+                                       else
+                                               _hash_relbuf(rel, rbuf);
                                        return;
                                }
 
@@ -680,28 +699,27 @@ _hash_squeezebucket(Relation rel,
                                wpage = BufferGetPage(wbuf);
                                wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
                                Assert(wopaque->hasho_bucket == bucket);
+                               wbuf_dirty = false;
                        }
 
                        /*
-                        * we have found room so insert on the "write" page.
+                        * we have found room so insert on the "write" page, being careful
+                        * to preserve hashkey ordering.  (If we insert many tuples into
+                        * the same "write" page it would be worth qsort'ing instead of
+                        * doing repeated _hash_pgaddtup.)
                         */
-                       woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage));
-                       if (PageAddItem(wpage, (Item) itup, itemsz, woffnum, false, false)
-                               == InvalidOffsetNumber)
-                               elog(ERROR, "failed to add index item to \"%s\"",
-                                        RelationGetRelationName(rel));
+                       (void) _hash_pgaddtup(rel, wbuf, itemsz, itup);
+                       wbuf_dirty = true;
 
-                       /*
-                        * delete the tuple from the "read" page. PageIndexTupleDelete
-                        * repacks the ItemId array, so 'roffnum' will be "advanced" to
-                        * the "next" ItemId.
-                        */
-                       PageIndexTupleDelete(rpage, roffnum);
+                       /* remember tuple for deletion from "read" page */
+                       deletable[ndeletable++] = roffnum;
                }
 
                /*
-                * if the "read" page is now empty because of the deletion (or because
-                * it was empty when we got to it), free it.
+                * If we reach here, there are no live tuples on the "read" page ---
+                * it was empty when we got to it, or we moved them all.  So we
+                * can just free the page without bothering with deleting tuples
+                * individually.  Then advance to the previous "read" page.
                 *
                 * Tricky point here: if our read and write pages are adjacent in the
                 * bucket chain, our write lock on wbuf will conflict with
@@ -709,36 +727,34 @@ _hash_squeezebucket(Relation rel,
                 * removed page.  However, in that case we are done anyway, so we can
                 * simply drop the write lock before calling _hash_freeovflpage.
                 */
-               if (PageIsEmpty(rpage))
-               {
-                       rblkno = ropaque->hasho_prevblkno;
-                       Assert(BlockNumberIsValid(rblkno));
+               rblkno = ropaque->hasho_prevblkno;
+               Assert(BlockNumberIsValid(rblkno));
 
-                       /* are we freeing the page adjacent to wbuf? */
-                       if (rblkno == wblkno)
-                       {
-                               /* yes, so release wbuf lock first */
+               /* are we freeing the page adjacent to wbuf? */
+               if (rblkno == wblkno)
+               {
+                       /* yes, so release wbuf lock first */
+                       if (wbuf_dirty)
                                _hash_wrtbuf(rel, wbuf);
-                               /* free this overflow page (releases rbuf) */
-                               _hash_freeovflpage(rel, rbuf, bstrategy);
-                               /* done */
-                               return;
-                       }
-
-                       /* free this overflow page, then get the previous one */
+                       else
+                               _hash_relbuf(rel, wbuf);
+                       /* free this overflow page (releases rbuf) */
                        _hash_freeovflpage(rel, rbuf, bstrategy);
+                       /* done */
+                       return;
+               }
 
-                       rbuf = _hash_getbuf_with_strategy(rel,
-                                                                                         rblkno,
-                                                                                         HASH_WRITE,
-                                                                                         LH_OVERFLOW_PAGE,
-                                                                                         bstrategy);
-                       rpage = BufferGetPage(rbuf);
-                       ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
-                       Assert(ropaque->hasho_bucket == bucket);
+               /* free this overflow page, then get the previous one */
+               _hash_freeovflpage(rel, rbuf, bstrategy);
 
-                       roffnum = FirstOffsetNumber;
-               }
+               rbuf = _hash_getbuf_with_strategy(rel,
+                                                                                 rblkno,
+                                                                                 HASH_WRITE,
+                                                                                 LH_OVERFLOW_PAGE,
+                                                                                 bstrategy);
+               rpage = BufferGetPage(rbuf);
+               ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
+               Assert(ropaque->hasho_bucket == bucket);
        }
 
        /* NOTREACHED */
index bd1f930e06339992dfbd4588ab426e582551b272..abb811466f51fee1a1ba63940b0c16dc8046de8c 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/hash/hashpage.c,v 1.80 2009/06/11 14:48:53 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/hash/hashpage.c,v 1.81 2009/11/01 21:25:25 tgl Exp $
  *
  * NOTES
  *       Postgres hash pages look like ordinary relation pages.  The opaque
@@ -765,20 +765,14 @@ _hash_splitbucket(Relation rel,
                                  uint32 highmask,
                                  uint32 lowmask)
 {
-       Bucket          bucket;
-       Buffer          obuf;
-       Buffer          nbuf;
        BlockNumber oblkno;
        BlockNumber nblkno;
-       HashPageOpaque oopaque;
-       HashPageOpaque nopaque;
-       IndexTuple      itup;
-       Size            itemsz;
-       OffsetNumber ooffnum;
-       OffsetNumber noffnum;
-       OffsetNumber omaxoffnum;
+       Buffer          obuf;
+       Buffer          nbuf;
        Page            opage;
        Page            npage;
+       HashPageOpaque oopaque;
+       HashPageOpaque nopaque;
 
        /*
         * It should be okay to simultaneously write-lock pages from each bucket,
@@ -805,95 +799,100 @@ _hash_splitbucket(Relation rel,
        /*
         * Partition the tuples in the old bucket between the old bucket and the
         * new bucket, advancing along the old bucket's overflow bucket chain and
-        * adding overflow pages to the new bucket as needed.
+        * adding overflow pages to the new bucket as needed.  Outer loop
+        * iterates once per page in old bucket.
         */
-       ooffnum = FirstOffsetNumber;
-       omaxoffnum = PageGetMaxOffsetNumber(opage);
        for (;;)
        {
-               /*
-                * at each iteration through this loop, each of these variables should
-                * be up-to-date: obuf opage oopaque ooffnum omaxoffnum
-                */
-
-               /* check if we're at the end of the page */
-               if (ooffnum > omaxoffnum)
+               OffsetNumber ooffnum;
+               OffsetNumber omaxoffnum;
+               OffsetNumber deletable[MaxOffsetNumber];
+               int                     ndeletable = 0;
+
+               /* Scan each tuple in old page */
+               omaxoffnum = PageGetMaxOffsetNumber(opage);
+               for (ooffnum = FirstOffsetNumber;
+                        ooffnum <= omaxoffnum;
+                        ooffnum = OffsetNumberNext(ooffnum))
                {
-                       /* at end of page, but check for an(other) overflow page */
-                       oblkno = oopaque->hasho_nextblkno;
-                       if (!BlockNumberIsValid(oblkno))
-                               break;
+                       IndexTuple      itup;
+                       Size            itemsz;
+                       Bucket          bucket;
 
                        /*
-                        * we ran out of tuples on this particular page, but we have more
-                        * overflow pages; advance to next page.
+                        * Fetch the item's hash key (conveniently stored in the item) and
+                        * determine which bucket it now belongs in.
                         */
-                       _hash_wrtbuf(rel, obuf);
+                       itup = (IndexTuple) PageGetItem(opage,
+                                                                                       PageGetItemId(opage, ooffnum));
+                       bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
+                                                                                 maxbucket, highmask, lowmask);
 
-                       obuf = _hash_getbuf(rel, oblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
-                       opage = BufferGetPage(obuf);
-                       oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
-                       ooffnum = FirstOffsetNumber;
-                       omaxoffnum = PageGetMaxOffsetNumber(opage);
-                       continue;
+                       if (bucket == nbucket)
+                       {
+                               /*
+                                * insert the tuple into the new bucket.  if it doesn't fit on
+                                * the current page in the new bucket, we must allocate a new
+                                * overflow page and place the tuple on that page instead.
+                                */
+                               itemsz = IndexTupleDSize(*itup);
+                               itemsz = MAXALIGN(itemsz);
+
+                               if (PageGetFreeSpace(npage) < itemsz)
+                               {
+                                       /* write out nbuf and drop lock, but keep pin */
+                                       _hash_chgbufaccess(rel, nbuf, HASH_WRITE, HASH_NOLOCK);
+                                       /* chain to a new overflow page */
+                                       nbuf = _hash_addovflpage(rel, metabuf, nbuf);
+                                       npage = BufferGetPage(nbuf);
+                                       /* we don't need nblkno or nopaque within the loop */
+                               }
+
+                               /*
+                                * Insert tuple on new page, using _hash_pgaddtup to ensure
+                                * correct ordering by hashkey.  This is a tad inefficient
+                                * since we may have to shuffle itempointers repeatedly.
+                                * Possible future improvement: accumulate all the items for
+                                * the new page and qsort them before insertion.
+                                */
+                               (void) _hash_pgaddtup(rel, nbuf, itemsz, itup);
+
+                               /*
+                                * Mark tuple for deletion from old page.
+                                */
+                               deletable[ndeletable++] = ooffnum;
+                       }
+                       else
+                       {
+                               /*
+                                * the tuple stays on this page, so nothing to do.
+                                */
+                               Assert(bucket == obucket);
+                       }
                }
 
+               oblkno = oopaque->hasho_nextblkno;
+
                /*
-                * Fetch the item's hash key (conveniently stored in the item) and
-                * determine which bucket it now belongs in.
+                * Done scanning this old page.  If we moved any tuples, delete them
+                * from the old page.
                 */
-               itup = (IndexTuple) PageGetItem(opage, PageGetItemId(opage, ooffnum));
-               bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
-                                                                         maxbucket, highmask, lowmask);
-
-               if (bucket == nbucket)
+               if (ndeletable > 0)
                {
-                       /*
-                        * insert the tuple into the new bucket.  if it doesn't fit on the
-                        * current page in the new bucket, we must allocate a new overflow
-                        * page and place the tuple on that page instead.
-                        */
-                       itemsz = IndexTupleDSize(*itup);
-                       itemsz = MAXALIGN(itemsz);
-
-                       if (PageGetFreeSpace(npage) < itemsz)
-                       {
-                               /* write out nbuf and drop lock, but keep pin */
-                               _hash_chgbufaccess(rel, nbuf, HASH_WRITE, HASH_NOLOCK);
-                               /* chain to a new overflow page */
-                               nbuf = _hash_addovflpage(rel, metabuf, nbuf);
-                               npage = BufferGetPage(nbuf);
-                               /* we don't need nopaque within the loop */
-                       }
-
-                       noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
-                       if (PageAddItem(npage, (Item) itup, itemsz, noffnum, false, false)
-                               == InvalidOffsetNumber)
-                               elog(ERROR, "failed to add index item to \"%s\"",
-                                        RelationGetRelationName(rel));
-
-                       /*
-                        * now delete the tuple from the old bucket.  after this section
-                        * of code, 'ooffnum' will actually point to the ItemId to which
-                        * we would point if we had advanced it before the deletion
-                        * (PageIndexTupleDelete repacks the ItemId array).  this also
-                        * means that 'omaxoffnum' is exactly one less than it used to be,
-                        * so we really can just decrement it instead of calling
-                        * PageGetMaxOffsetNumber.
-                        */
-                       PageIndexTupleDelete(opage, ooffnum);
-                       omaxoffnum = OffsetNumberPrev(omaxoffnum);
+                       PageIndexMultiDelete(opage, deletable, ndeletable);
+                       _hash_wrtbuf(rel, obuf);
                }
                else
-               {
-                       /*
-                        * the tuple stays on this page.  we didn't move anything, so we
-                        * didn't delete anything and therefore we don't have to change
-                        * 'omaxoffnum'.
-                        */
-                       Assert(bucket == obucket);
-                       ooffnum = OffsetNumberNext(ooffnum);
-               }
+                       _hash_relbuf(rel, obuf);
+
+               /* Exit loop if no more overflow pages in old bucket */
+               if (!BlockNumberIsValid(oblkno))
+                       break;
+
+               /* Else, advance to next old page */
+               obuf = _hash_getbuf(rel, oblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
+               opage = BufferGetPage(obuf);
+               oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
        }
 
        /*
@@ -902,7 +901,6 @@ _hash_splitbucket(Relation rel,
         * tuples remaining in the old bucket (including the overflow pages) are
         * packed as tightly as possible.  The new bucket is already tight.
         */
-       _hash_wrtbuf(rel, obuf);
        _hash_wrtbuf(rel, nbuf);
 
        _hash_squeezebucket(rel, obucket, start_oblkno, NULL);
index a70ace032f4abfbd2857820bcb9e9ac578535248..100ccb9e6dbbc8816814b17d8e22d73f6685b9c9 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/hash.h,v 1.93 2009/06/11 14:49:08 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/access/hash.h,v 1.94 2009/11/01 21:25:25 tgl Exp $
  *
  * NOTES
  *             modeled after Margo Seltzer's hash implementation for unix.
@@ -280,6 +280,8 @@ extern Datum hash_uint32(uint32 k);
 
 /* hashinsert.c */
 extern void _hash_doinsert(Relation rel, IndexTuple itup);
+extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
+                          Size itemsize, IndexTuple itup);
 
 /* hashovfl.c */
 extern Buffer _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf);