]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hashpage.c
4544889294a80c37ba8d2758f3d154693092ad1f
[postgresql] / src / backend / access / hash / hashpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * hashpage.c
4  *        Hash table page management code for the Postgres hash access method
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/hash/hashpage.c
12  *
13  * NOTES
14  *        Postgres hash pages look like ordinary relation pages.  The opaque
15  *        data at high addresses includes information about the page including
16  *        whether a page is an overflow page or a true bucket, the bucket
17  *        number, and the block numbers of the preceding and following pages
18  *        in the same bucket.
19  *
20  *        The first page in a hash relation, page zero, is special -- it stores
21  *        information describing the hash table; it is referred to as the
22  *        "meta page." Pages one and higher store the actual data.
23  *
24  *        There are also bitmap pages, which are not manipulated here;
25  *        see hashovfl.c.
26  *
27  *-------------------------------------------------------------------------
28  */
29 #include "postgres.h"
30
31 #include "access/hash.h"
32 #include "access/hash_xlog.h"
33 #include "miscadmin.h"
34 #include "storage/lmgr.h"
35 #include "storage/smgr.h"
36
37
38 static bool _hash_alloc_buckets(Relation rel, BlockNumber firstblock,
39                                         uint32 nblocks);
40 static void _hash_splitbucket(Relation rel, Buffer metabuf,
41                                   Bucket obucket, Bucket nbucket,
42                                   Buffer obuf,
43                                   Buffer nbuf,
44                                   HTAB *htab,
45                                   uint32 maxbucket,
46                                   uint32 highmask, uint32 lowmask);
47 static void log_split_page(Relation rel, Buffer buf);
48
49
50 /*
51  * We use high-concurrency locking on hash indexes (see README for an overview
52  * of the locking rules).  However, we can skip taking lmgr locks when the
53  * index is local to the current backend (ie, either temp or new in the
54  * current transaction).  No one else can see it, so there's no reason to
55  * take locks.  We still take buffer-level locks, but not lmgr locks.
56  */
57 #define USELOCKING(rel)         (!RELATION_IS_LOCAL(rel))
58
59
60 /*
61  *      _hash_getbuf() -- Get a buffer by block number for read or write.
62  *
63  *              'access' must be HASH_READ, HASH_WRITE, or HASH_NOLOCK.
64  *              'flags' is a bitwise OR of the allowed page types.
65  *
66  *              This must be used only to fetch pages that are expected to be valid
67  *              already.  _hash_checkpage() is applied using the given flags.
68  *
69  *              When this routine returns, the appropriate lock is set on the
70  *              requested buffer and its reference count has been incremented
71  *              (ie, the buffer is "locked and pinned").
72  *
73  *              P_NEW is disallowed because this routine can only be used
74  *              to access pages that are known to be before the filesystem EOF.
75  *              Extending the index should be done with _hash_getnewbuf.
76  */
77 Buffer
78 _hash_getbuf(Relation rel, BlockNumber blkno, int access, int flags)
79 {
80         Buffer          buf;
81
82         if (blkno == P_NEW)
83                 elog(ERROR, "hash AM does not use P_NEW");
84
85         buf = ReadBuffer(rel, blkno);
86
87         if (access != HASH_NOLOCK)
88                 LockBuffer(buf, access);
89
90         /* ref count and lock type are correct */
91
92         _hash_checkpage(rel, buf, flags);
93
94         return buf;
95 }
96
97 /*
98  * _hash_getbuf_with_condlock_cleanup() -- Try to get a buffer for cleanup.
99  *
100  *              We read the page and try to acquire a cleanup lock.  If we get it,
101  *              we return the buffer; otherwise, we return InvalidBuffer.
102  */
103 Buffer
104 _hash_getbuf_with_condlock_cleanup(Relation rel, BlockNumber blkno, int flags)
105 {
106         Buffer          buf;
107
108         if (blkno == P_NEW)
109                 elog(ERROR, "hash AM does not use P_NEW");
110
111         buf = ReadBuffer(rel, blkno);
112
113         if (!ConditionalLockBufferForCleanup(buf))
114         {
115                 ReleaseBuffer(buf);
116                 return InvalidBuffer;
117         }
118
119         /* ref count and lock type are correct */
120
121         _hash_checkpage(rel, buf, flags);
122
123         return buf;
124 }
125
126 /*
127  *      _hash_getinitbuf() -- Get and initialize a buffer by block number.
128  *
129  *              This must be used only to fetch pages that are known to be before
130  *              the index's filesystem EOF, but are to be filled from scratch.
131  *              _hash_pageinit() is applied automatically.  Otherwise it has
132  *              effects similar to _hash_getbuf() with access = HASH_WRITE.
133  *
134  *              When this routine returns, a write lock is set on the
135  *              requested buffer and its reference count has been incremented
136  *              (ie, the buffer is "locked and pinned").
137  *
138  *              P_NEW is disallowed because this routine can only be used
139  *              to access pages that are known to be before the filesystem EOF.
140  *              Extending the index should be done with _hash_getnewbuf.
141  */
142 Buffer
143 _hash_getinitbuf(Relation rel, BlockNumber blkno)
144 {
145         Buffer          buf;
146
147         if (blkno == P_NEW)
148                 elog(ERROR, "hash AM does not use P_NEW");
149
150         buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK,
151                                                          NULL);
152
153         /* ref count and lock type are correct */
154
155         /* initialize the page */
156         _hash_pageinit(BufferGetPage(buf), BufferGetPageSize(buf));
157
158         return buf;
159 }
160
161 /*
162  *      _hash_initbuf() -- Get and initialize a buffer by bucket number.
163  */
164 void
165 _hash_initbuf(Buffer buf, uint32 max_bucket, uint32 num_bucket, uint32 flag,
166                           bool initpage)
167 {
168         HashPageOpaque pageopaque;
169         Page            page;
170
171         page = BufferGetPage(buf);
172
173         /* initialize the page */
174         if (initpage)
175                 _hash_pageinit(page, BufferGetPageSize(buf));
176
177         pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
178
179         /*
180          * Set hasho_prevblkno with current hashm_maxbucket. This value will be
181          * used to validate cached HashMetaPageData. See
182          * _hash_getbucketbuf_from_hashkey().
183          */
184         pageopaque->hasho_prevblkno = max_bucket;
185         pageopaque->hasho_nextblkno = InvalidBlockNumber;
186         pageopaque->hasho_bucket = num_bucket;
187         pageopaque->hasho_flag = flag;
188         pageopaque->hasho_page_id = HASHO_PAGE_ID;
189 }
190
191 /*
192  *      _hash_getnewbuf() -- Get a new page at the end of the index.
193  *
194  *              This has the same API as _hash_getinitbuf, except that we are adding
195  *              a page to the index, and hence expect the page to be past the
196  *              logical EOF.  (However, we have to support the case where it isn't,
197  *              since a prior try might have crashed after extending the filesystem
198  *              EOF but before updating the metapage to reflect the added page.)
199  *
200  *              It is caller's responsibility to ensure that only one process can
201  *              extend the index at a time.  In practice, this function is called
202  *              only while holding write lock on the metapage, because adding a page
203  *              is always associated with an update of metapage data.
204  */
205 Buffer
206 _hash_getnewbuf(Relation rel, BlockNumber blkno, ForkNumber forkNum)
207 {
208         BlockNumber nblocks = RelationGetNumberOfBlocksInFork(rel, forkNum);
209         Buffer          buf;
210
211         if (blkno == P_NEW)
212                 elog(ERROR, "hash AM does not use P_NEW");
213         if (blkno > nblocks)
214                 elog(ERROR, "access to noncontiguous page in hash index \"%s\"",
215                          RelationGetRelationName(rel));
216
217         /* smgr insists we use P_NEW to extend the relation */
218         if (blkno == nblocks)
219         {
220                 buf = ReadBufferExtended(rel, forkNum, P_NEW, RBM_NORMAL, NULL);
221                 if (BufferGetBlockNumber(buf) != blkno)
222                         elog(ERROR, "unexpected hash relation size: %u, should be %u",
223                                  BufferGetBlockNumber(buf), blkno);
224                 LockBuffer(buf, HASH_WRITE);
225         }
226         else
227         {
228                 buf = ReadBufferExtended(rel, forkNum, blkno, RBM_ZERO_AND_LOCK,
229                                                                  NULL);
230         }
231
232         /* ref count and lock type are correct */
233
234         /* initialize the page */
235         _hash_pageinit(BufferGetPage(buf), BufferGetPageSize(buf));
236
237         return buf;
238 }
239
240 /*
241  *      _hash_getbuf_with_strategy() -- Get a buffer with nondefault strategy.
242  *
243  *              This is identical to _hash_getbuf() but also allows a buffer access
244  *              strategy to be specified.  We use this for VACUUM operations.
245  */
246 Buffer
247 _hash_getbuf_with_strategy(Relation rel, BlockNumber blkno,
248                                                    int access, int flags,
249                                                    BufferAccessStrategy bstrategy)
250 {
251         Buffer          buf;
252
253         if (blkno == P_NEW)
254                 elog(ERROR, "hash AM does not use P_NEW");
255
256         buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, bstrategy);
257
258         if (access != HASH_NOLOCK)
259                 LockBuffer(buf, access);
260
261         /* ref count and lock type are correct */
262
263         _hash_checkpage(rel, buf, flags);
264
265         return buf;
266 }
267
268 /*
269  *      _hash_relbuf() -- release a locked buffer.
270  *
271  * Lock and pin (refcount) are both dropped.
272  */
273 void
274 _hash_relbuf(Relation rel, Buffer buf)
275 {
276         UnlockReleaseBuffer(buf);
277 }
278
279 /*
280  *      _hash_dropbuf() -- release an unlocked buffer.
281  *
282  * This is used to unpin a buffer on which we hold no lock.
283  */
284 void
285 _hash_dropbuf(Relation rel, Buffer buf)
286 {
287         ReleaseBuffer(buf);
288 }
289
290 /*
291  *      _hash_dropscanbuf() -- release buffers used in scan.
292  *
293  * This routine unpins the buffers used during scan on which we
294  * hold no lock.
295  */
296 void
297 _hash_dropscanbuf(Relation rel, HashScanOpaque so)
298 {
299         /* release pin we hold on primary bucket page */
300         if (BufferIsValid(so->hashso_bucket_buf) &&
301                 so->hashso_bucket_buf != so->hashso_curbuf)
302                 _hash_dropbuf(rel, so->hashso_bucket_buf);
303         so->hashso_bucket_buf = InvalidBuffer;
304
305         /* release pin we hold on primary bucket page  of bucket being split */
306         if (BufferIsValid(so->hashso_split_bucket_buf) &&
307                 so->hashso_split_bucket_buf != so->hashso_curbuf)
308                 _hash_dropbuf(rel, so->hashso_split_bucket_buf);
309         so->hashso_split_bucket_buf = InvalidBuffer;
310
311         /* release any pin we still hold */
312         if (BufferIsValid(so->hashso_curbuf))
313                 _hash_dropbuf(rel, so->hashso_curbuf);
314         so->hashso_curbuf = InvalidBuffer;
315
316         /* reset split scan */
317         so->hashso_buc_populated = false;
318         so->hashso_buc_split = false;
319 }
320
321
322 /*
323  *      _hash_init() -- Initialize the metadata page of a hash index,
324  *                              the initial buckets, and the initial bitmap page.
325  *
326  * The initial number of buckets is dependent on num_tuples, an estimate
327  * of the number of tuples to be loaded into the index initially.  The
328  * chosen number of buckets is returned.
329  *
330  * We are fairly cavalier about locking here, since we know that no one else
331  * could be accessing this index.  In particular the rule about not holding
332  * multiple buffer locks is ignored.
333  */
334 uint32
335 _hash_init(Relation rel, double num_tuples, ForkNumber forkNum)
336 {
337         Buffer          metabuf;
338         Buffer          buf;
339         Buffer          bitmapbuf;
340         Page            pg;
341         HashMetaPage metap;
342         RegProcedure procid;
343         int32           data_width;
344         int32           item_width;
345         int32           ffactor;
346         uint32          num_buckets;
347         uint32          i;
348
349         /* safety check */
350         if (RelationGetNumberOfBlocksInFork(rel, forkNum) != 0)
351                 elog(ERROR, "cannot initialize non-empty hash index \"%s\"",
352                          RelationGetRelationName(rel));
353
354         /*
355          * Determine the target fill factor (in tuples per bucket) for this index.
356          * The idea is to make the fill factor correspond to pages about as full
357          * as the user-settable fillfactor parameter says.  We can compute it
358          * exactly since the index datatype (i.e. uint32 hash key) is fixed-width.
359          */
360         data_width = sizeof(uint32);
361         item_width = MAXALIGN(sizeof(IndexTupleData)) + MAXALIGN(data_width) +
362                 sizeof(ItemIdData);             /* include the line pointer */
363         ffactor = RelationGetTargetPageUsage(rel, HASH_DEFAULT_FILLFACTOR) / item_width;
364         /* keep to a sane range */
365         if (ffactor < 10)
366                 ffactor = 10;
367
368         procid = index_getprocid(rel, 1, HASHPROC);
369
370         /*
371          * We initialize the metapage, the first N bucket pages, and the first
372          * bitmap page in sequence, using _hash_getnewbuf to cause smgrextend()
373          * calls to occur.  This ensures that the smgr level has the right idea of
374          * the physical index length.
375          *
376          * Critical section not required, because on error the creation of the
377          * whole relation will be rolled back.
378          */
379         metabuf = _hash_getnewbuf(rel, HASH_METAPAGE, forkNum);
380         _hash_init_metabuffer(metabuf, num_tuples, procid, ffactor, false);
381         MarkBufferDirty(metabuf);
382
383         pg = BufferGetPage(metabuf);
384         metap = HashPageGetMeta(pg);
385
386         /* XLOG stuff */
387         if (RelationNeedsWAL(rel))
388         {
389                 xl_hash_init_meta_page xlrec;
390                 XLogRecPtr      recptr;
391
392                 xlrec.num_tuples = num_tuples;
393                 xlrec.procid = metap->hashm_procid;
394                 xlrec.ffactor = metap->hashm_ffactor;
395
396                 XLogBeginInsert();
397                 XLogRegisterData((char *) &xlrec, SizeOfHashInitMetaPage);
398                 XLogRegisterBuffer(0, metabuf, REGBUF_WILL_INIT);
399
400                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_META_PAGE);
401
402                 PageSetLSN(BufferGetPage(metabuf), recptr);
403         }
404
405         num_buckets = metap->hashm_maxbucket + 1;
406
407         /*
408          * Release buffer lock on the metapage while we initialize buckets.
409          * Otherwise, we'll be in interrupt holdoff and the CHECK_FOR_INTERRUPTS
410          * won't accomplish anything.  It's a bad idea to hold buffer locks for
411          * long intervals in any case, since that can block the bgwriter.
412          */
413         LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
414
415         /*
416          * Initialize and WAL Log the first N buckets
417          */
418         for (i = 0; i < num_buckets; i++)
419         {
420                 BlockNumber blkno;
421
422                 /* Allow interrupts, in case N is huge */
423                 CHECK_FOR_INTERRUPTS();
424
425                 blkno = BUCKET_TO_BLKNO(metap, i);
426                 buf = _hash_getnewbuf(rel, blkno, forkNum);
427                 _hash_initbuf(buf, metap->hashm_maxbucket, i, LH_BUCKET_PAGE, false);
428                 MarkBufferDirty(buf);
429
430                 log_newpage(&rel->rd_node,
431                                         forkNum,
432                                         blkno,
433                                         BufferGetPage(buf),
434                                         true);
435                 _hash_relbuf(rel, buf);
436         }
437
438         /* Now reacquire buffer lock on metapage */
439         LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
440
441         /*
442          * Initialize bitmap page
443          */
444         bitmapbuf = _hash_getnewbuf(rel, num_buckets + 1, forkNum);
445         _hash_initbitmapbuffer(bitmapbuf, metap->hashm_bmsize, false);
446         MarkBufferDirty(bitmapbuf);
447
448         /* add the new bitmap page to the metapage's list of bitmaps */
449         /* metapage already has a write lock */
450         if (metap->hashm_nmaps >= HASH_MAX_BITMAPS)
451                 ereport(ERROR,
452                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
453                                  errmsg("out of overflow pages in hash index \"%s\"",
454                                                 RelationGetRelationName(rel))));
455
456         metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
457
458         metap->hashm_nmaps++;
459         MarkBufferDirty(metabuf);
460
461         /* XLOG stuff */
462         if (RelationNeedsWAL(rel))
463         {
464                 xl_hash_init_bitmap_page xlrec;
465                 XLogRecPtr      recptr;
466
467                 xlrec.bmsize = metap->hashm_bmsize;
468
469                 XLogBeginInsert();
470                 XLogRegisterData((char *) &xlrec, SizeOfHashInitBitmapPage);
471                 XLogRegisterBuffer(0, bitmapbuf, REGBUF_WILL_INIT);
472
473                 /*
474                  * This is safe only because nobody else can be modifying the index at
475                  * this stage; it's only visible to the transaction that is creating
476                  * it.
477                  */
478                 XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
479
480                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_BITMAP_PAGE);
481
482                 PageSetLSN(BufferGetPage(bitmapbuf), recptr);
483                 PageSetLSN(BufferGetPage(metabuf), recptr);
484         }
485
486         /* all done */
487         _hash_relbuf(rel, bitmapbuf);
488         _hash_relbuf(rel, metabuf);
489
490         return num_buckets;
491 }
492
493 /*
494  *      _hash_init_metabuffer() -- Initialize the metadata page of a hash index.
495  */
496 void
497 _hash_init_metabuffer(Buffer buf, double num_tuples, RegProcedure procid,
498                                           uint16 ffactor, bool initpage)
499 {
500         HashMetaPage metap;
501         HashPageOpaque pageopaque;
502         Page            page;
503         double          dnumbuckets;
504         uint32          num_buckets;
505         uint32          spare_index;
506         uint32          i;
507
508         /*
509          * Choose the number of initial bucket pages to match the fill factor
510          * given the estimated number of tuples.  We round up the result to the
511          * total number of buckets which has to be allocated before using its
512          * _hashm_spare element. However always force at least 2 bucket pages. The
513          * upper limit is determined by considerations explained in
514          * _hash_expandtable().
515          */
516         dnumbuckets = num_tuples / ffactor;
517         if (dnumbuckets <= 2.0)
518                 num_buckets = 2;
519         else if (dnumbuckets >= (double) 0x40000000)
520                 num_buckets = 0x40000000;
521         else
522                 num_buckets = _hash_get_totalbuckets(_hash_spareindex(dnumbuckets));
523
524         spare_index = _hash_spareindex(num_buckets);
525         Assert(spare_index < HASH_MAX_SPLITPOINTS);
526
527         page = BufferGetPage(buf);
528         if (initpage)
529                 _hash_pageinit(page, BufferGetPageSize(buf));
530
531         pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
532         pageopaque->hasho_prevblkno = InvalidBlockNumber;
533         pageopaque->hasho_nextblkno = InvalidBlockNumber;
534         pageopaque->hasho_bucket = -1;
535         pageopaque->hasho_flag = LH_META_PAGE;
536         pageopaque->hasho_page_id = HASHO_PAGE_ID;
537
538         metap = HashPageGetMeta(page);
539
540         metap->hashm_magic = HASH_MAGIC;
541         metap->hashm_version = HASH_VERSION;
542         metap->hashm_ntuples = 0;
543         metap->hashm_nmaps = 0;
544         metap->hashm_ffactor = ffactor;
545         metap->hashm_bsize = HashGetMaxBitmapSize(page);
546         /* find largest bitmap array size that will fit in page size */
547         for (i = _hash_log2(metap->hashm_bsize); i > 0; --i)
548         {
549                 if ((1 << i) <= metap->hashm_bsize)
550                         break;
551         }
552         Assert(i > 0);
553         metap->hashm_bmsize = 1 << i;
554         metap->hashm_bmshift = i + BYTE_TO_BIT;
555         Assert((1 << BMPG_SHIFT(metap)) == (BMPG_MASK(metap) + 1));
556
557         /*
558          * Label the index with its primary hash support function's OID.  This is
559          * pretty useless for normal operation (in fact, hashm_procid is not used
560          * anywhere), but it might be handy for forensic purposes so we keep it.
561          */
562         metap->hashm_procid = procid;
563
564         /*
565          * We initialize the index with N buckets, 0 .. N-1, occupying physical
566          * blocks 1 to N.  The first freespace bitmap page is in block N+1.
567          */
568         metap->hashm_maxbucket = num_buckets - 1;
569
570         /*
571          * Set highmask as next immediate ((2 ^ x) - 1), which should be
572          * sufficient to cover num_buckets.
573          */
574         metap->hashm_highmask = (1 << (_hash_log2(num_buckets + 1))) - 1;
575         metap->hashm_lowmask = (metap->hashm_highmask >> 1);
576
577         MemSet(metap->hashm_spares, 0, sizeof(metap->hashm_spares));
578         MemSet(metap->hashm_mapp, 0, sizeof(metap->hashm_mapp));
579
580         /* Set up mapping for one spare page after the initial splitpoints */
581         metap->hashm_spares[spare_index] = 1;
582         metap->hashm_ovflpoint = spare_index;
583         metap->hashm_firstfree = 0;
584
585         /*
586          * Set pd_lower just past the end of the metadata.  This is to log full
587          * page image of metapage in xloginsert.c.
588          */
589         ((PageHeader) page)->pd_lower =
590                 ((char *) metap + sizeof(HashMetaPageData)) - (char *) page;
591 }
592
593 /*
594  *      _hash_pageinit() -- Initialize a new hash index page.
595  */
596 void
597 _hash_pageinit(Page page, Size size)
598 {
599         PageInit(page, size, sizeof(HashPageOpaqueData));
600 }
601
602 /*
603  * Attempt to expand the hash table by creating one new bucket.
604  *
605  * This will silently do nothing if we don't get cleanup lock on old or
606  * new bucket.
607  *
608  * Complete the pending splits and remove the tuples from old bucket,
609  * if there are any left over from the previous split.
610  *
611  * The caller must hold a pin, but no lock, on the metapage buffer.
612  * The buffer is returned in the same state.
613  */
614 void
615 _hash_expandtable(Relation rel, Buffer metabuf)
616 {
617         HashMetaPage metap;
618         Bucket          old_bucket;
619         Bucket          new_bucket;
620         uint32          spare_ndx;
621         BlockNumber start_oblkno;
622         BlockNumber start_nblkno;
623         Buffer          buf_nblkno;
624         Buffer          buf_oblkno;
625         Page            opage;
626         Page            npage;
627         HashPageOpaque oopaque;
628         HashPageOpaque nopaque;
629         uint32          maxbucket;
630         uint32          highmask;
631         uint32          lowmask;
632         bool            metap_update_masks = false;
633         bool            metap_update_splitpoint = false;
634
635 restart_expand:
636
637         /*
638          * Write-lock the meta page.  It used to be necessary to acquire a
639          * heavyweight lock to begin a split, but that is no longer required.
640          */
641         LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
642
643         _hash_checkpage(rel, metabuf, LH_META_PAGE);
644         metap = HashPageGetMeta(BufferGetPage(metabuf));
645
646         /*
647          * Check to see if split is still needed; someone else might have already
648          * done one while we waited for the lock.
649          *
650          * Make sure this stays in sync with _hash_doinsert()
651          */
652         if (metap->hashm_ntuples <=
653                 (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1))
654                 goto fail;
655
656         /*
657          * Can't split anymore if maxbucket has reached its maximum possible
658          * value.
659          *
660          * Ideally we'd allow bucket numbers up to UINT_MAX-1 (no higher because
661          * the calculation maxbucket+1 mustn't overflow).  Currently we restrict
662          * to half that because of overflow looping in _hash_log2() and
663          * insufficient space in hashm_spares[].  It's moot anyway because an
664          * index with 2^32 buckets would certainly overflow BlockNumber and hence
665          * _hash_alloc_buckets() would fail, but if we supported buckets smaller
666          * than a disk block then this would be an independent constraint.
667          *
668          * If you change this, see also the maximum initial number of buckets in
669          * _hash_init().
670          */
671         if (metap->hashm_maxbucket >= (uint32) 0x7FFFFFFE)
672                 goto fail;
673
674         /*
675          * Determine which bucket is to be split, and attempt to take cleanup lock
676          * on the old bucket.  If we can't get the lock, give up.
677          *
678          * The cleanup lock protects us not only against other backends, but
679          * against our own backend as well.
680          *
681          * The cleanup lock is mainly to protect the split from concurrent
682          * inserts. See src/backend/access/hash/README, Lock Definitions for
683          * further details.  Due to this locking restriction, if there is any
684          * pending scan, the split will give up which is not good, but harmless.
685          */
686         new_bucket = metap->hashm_maxbucket + 1;
687
688         old_bucket = (new_bucket & metap->hashm_lowmask);
689
690         start_oblkno = BUCKET_TO_BLKNO(metap, old_bucket);
691
692         buf_oblkno = _hash_getbuf_with_condlock_cleanup(rel, start_oblkno, LH_BUCKET_PAGE);
693         if (!buf_oblkno)
694                 goto fail;
695
696         opage = BufferGetPage(buf_oblkno);
697         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
698
699         /*
700          * We want to finish the split from a bucket as there is no apparent
701          * benefit by not doing so and it will make the code complicated to finish
702          * the split that involves multiple buckets considering the case where new
703          * split also fails.  We don't need to consider the new bucket for
704          * completing the split here as it is not possible that a re-split of new
705          * bucket starts when there is still a pending split from old bucket.
706          */
707         if (H_BUCKET_BEING_SPLIT(oopaque))
708         {
709                 /*
710                  * Copy bucket mapping info now; refer the comment in code below where
711                  * we copy this information before calling _hash_splitbucket to see
712                  * why this is okay.
713                  */
714                 maxbucket = metap->hashm_maxbucket;
715                 highmask = metap->hashm_highmask;
716                 lowmask = metap->hashm_lowmask;
717
718                 /*
719                  * Release the lock on metapage and old_bucket, before completing the
720                  * split.
721                  */
722                 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
723                 LockBuffer(buf_oblkno, BUFFER_LOCK_UNLOCK);
724
725                 _hash_finish_split(rel, metabuf, buf_oblkno, old_bucket, maxbucket,
726                                                    highmask, lowmask);
727
728                 /* release the pin on old buffer and retry for expand. */
729                 _hash_dropbuf(rel, buf_oblkno);
730
731                 goto restart_expand;
732         }
733
734         /*
735          * Clean the tuples remained from the previous split.  This operation
736          * requires cleanup lock and we already have one on the old bucket, so
737          * let's do it. We also don't want to allow further splits from the bucket
738          * till the garbage of previous split is cleaned.  This has two
739          * advantages; first, it helps in avoiding the bloat due to garbage and
740          * second is, during cleanup of bucket, we are always sure that the
741          * garbage tuples belong to most recently split bucket.  On the contrary,
742          * if we allow cleanup of bucket after meta page is updated to indicate
743          * the new split and before the actual split, the cleanup operation won't
744          * be able to decide whether the tuple has been moved to the newly created
745          * bucket and ended up deleting such tuples.
746          */
747         if (H_NEEDS_SPLIT_CLEANUP(oopaque))
748         {
749                 /*
750                  * Copy bucket mapping info now; refer to the comment in code below
751                  * where we copy this information before calling _hash_splitbucket to
752                  * see why this is okay.
753                  */
754                 maxbucket = metap->hashm_maxbucket;
755                 highmask = metap->hashm_highmask;
756                 lowmask = metap->hashm_lowmask;
757
758                 /* Release the metapage lock. */
759                 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
760
761                 hashbucketcleanup(rel, old_bucket, buf_oblkno, start_oblkno, NULL,
762                                                   maxbucket, highmask, lowmask, NULL, NULL, true,
763                                                   NULL, NULL);
764
765                 _hash_dropbuf(rel, buf_oblkno);
766
767                 goto restart_expand;
768         }
769
770         /*
771          * There shouldn't be any active scan on new bucket.
772          *
773          * Note: it is safe to compute the new bucket's blkno here, even though we
774          * may still need to update the BUCKET_TO_BLKNO mapping.  This is because
775          * the current value of hashm_spares[hashm_ovflpoint] correctly shows
776          * where we are going to put a new splitpoint's worth of buckets.
777          */
778         start_nblkno = BUCKET_TO_BLKNO(metap, new_bucket);
779
780         /*
781          * If the split point is increasing we need to allocate a new batch of
782          * bucket pages.
783          */
784         spare_ndx = _hash_spareindex(new_bucket + 1);
785         if (spare_ndx > metap->hashm_ovflpoint)
786         {
787                 uint32          buckets_to_add;
788
789                 Assert(spare_ndx == metap->hashm_ovflpoint + 1);
790
791                 /*
792                  * We treat allocation of buckets as a separate WAL-logged action.
793                  * Even if we fail after this operation, won't leak bucket pages;
794                  * rather, the next split will consume this space. In any case, even
795                  * without failure we don't use all the space in one split operation.
796                  */
797                 buckets_to_add = _hash_get_totalbuckets(spare_ndx) - new_bucket;
798                 if (!_hash_alloc_buckets(rel, start_nblkno, buckets_to_add))
799                 {
800                         /* can't split due to BlockNumber overflow */
801                         _hash_relbuf(rel, buf_oblkno);
802                         goto fail;
803                 }
804         }
805
806         /*
807          * Physically allocate the new bucket's primary page.  We want to do this
808          * before changing the metapage's mapping info, in case we can't get the
809          * disk space.  Ideally, we don't need to check for cleanup lock on new
810          * bucket as no other backend could find this bucket unless meta page is
811          * updated.  However, it is good to be consistent with old bucket locking.
812          */
813         buf_nblkno = _hash_getnewbuf(rel, start_nblkno, MAIN_FORKNUM);
814         if (!IsBufferCleanupOK(buf_nblkno))
815         {
816                 _hash_relbuf(rel, buf_oblkno);
817                 _hash_relbuf(rel, buf_nblkno);
818                 goto fail;
819         }
820
821         /*
822          * Since we are scribbling on the pages in the shared buffers, establish a
823          * critical section.  Any failure in this next code leaves us with a big
824          * problem: the metapage is effectively corrupt but could get written back
825          * to disk.
826          */
827         START_CRIT_SECTION();
828
829         /*
830          * Okay to proceed with split.  Update the metapage bucket mapping info.
831          */
832         metap->hashm_maxbucket = new_bucket;
833
834         if (new_bucket > metap->hashm_highmask)
835         {
836                 /* Starting a new doubling */
837                 metap->hashm_lowmask = metap->hashm_highmask;
838                 metap->hashm_highmask = new_bucket | metap->hashm_lowmask;
839                 metap_update_masks = true;
840         }
841
842         /*
843          * If the split point is increasing we need to adjust the hashm_spares[]
844          * array and hashm_ovflpoint so that future overflow pages will be created
845          * beyond this new batch of bucket pages.
846          */
847         if (spare_ndx > metap->hashm_ovflpoint)
848         {
849                 metap->hashm_spares[spare_ndx] = metap->hashm_spares[metap->hashm_ovflpoint];
850                 metap->hashm_ovflpoint = spare_ndx;
851                 metap_update_splitpoint = true;
852         }
853
854         MarkBufferDirty(metabuf);
855
856         /*
857          * Copy bucket mapping info now; this saves re-accessing the meta page
858          * inside _hash_splitbucket's inner loop.  Note that once we drop the
859          * split lock, other splits could begin, so these values might be out of
860          * date before _hash_splitbucket finishes.  That's okay, since all it
861          * needs is to tell which of these two buckets to map hashkeys into.
862          */
863         maxbucket = metap->hashm_maxbucket;
864         highmask = metap->hashm_highmask;
865         lowmask = metap->hashm_lowmask;
866
867         opage = BufferGetPage(buf_oblkno);
868         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
869
870         /*
871          * Mark the old bucket to indicate that split is in progress.  (At
872          * operation end, we will clear the split-in-progress flag.)  Also, for a
873          * primary bucket page, hasho_prevblkno stores the number of buckets that
874          * existed as of the last split, so we must update that value here.
875          */
876         oopaque->hasho_flag |= LH_BUCKET_BEING_SPLIT;
877         oopaque->hasho_prevblkno = maxbucket;
878
879         MarkBufferDirty(buf_oblkno);
880
881         npage = BufferGetPage(buf_nblkno);
882
883         /*
884          * initialize the new bucket's primary page and mark it to indicate that
885          * split is in progress.
886          */
887         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
888         nopaque->hasho_prevblkno = maxbucket;
889         nopaque->hasho_nextblkno = InvalidBlockNumber;
890         nopaque->hasho_bucket = new_bucket;
891         nopaque->hasho_flag = LH_BUCKET_PAGE | LH_BUCKET_BEING_POPULATED;
892         nopaque->hasho_page_id = HASHO_PAGE_ID;
893
894         MarkBufferDirty(buf_nblkno);
895
896         /* XLOG stuff */
897         if (RelationNeedsWAL(rel))
898         {
899                 xl_hash_split_allocate_page xlrec;
900                 XLogRecPtr      recptr;
901
902                 xlrec.new_bucket = maxbucket;
903                 xlrec.old_bucket_flag = oopaque->hasho_flag;
904                 xlrec.new_bucket_flag = nopaque->hasho_flag;
905                 xlrec.flags = 0;
906
907                 XLogBeginInsert();
908
909                 XLogRegisterBuffer(0, buf_oblkno, REGBUF_STANDARD);
910                 XLogRegisterBuffer(1, buf_nblkno, REGBUF_WILL_INIT);
911                 XLogRegisterBuffer(2, metabuf, REGBUF_STANDARD);
912
913                 if (metap_update_masks)
914                 {
915                         xlrec.flags |= XLH_SPLIT_META_UPDATE_MASKS;
916                         XLogRegisterBufData(2, (char *) &metap->hashm_lowmask, sizeof(uint32));
917                         XLogRegisterBufData(2, (char *) &metap->hashm_highmask, sizeof(uint32));
918                 }
919
920                 if (metap_update_splitpoint)
921                 {
922                         xlrec.flags |= XLH_SPLIT_META_UPDATE_SPLITPOINT;
923                         XLogRegisterBufData(2, (char *) &metap->hashm_ovflpoint,
924                                                                 sizeof(uint32));
925                         XLogRegisterBufData(2,
926                                            (char *) &metap->hashm_spares[metap->hashm_ovflpoint],
927                                                                 sizeof(uint32));
928                 }
929
930                 XLogRegisterData((char *) &xlrec, SizeOfHashSplitAllocPage);
931
932                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_ALLOCATE_PAGE);
933
934                 PageSetLSN(BufferGetPage(buf_oblkno), recptr);
935                 PageSetLSN(BufferGetPage(buf_nblkno), recptr);
936                 PageSetLSN(BufferGetPage(metabuf), recptr);
937         }
938
939         END_CRIT_SECTION();
940
941         /* drop lock, but keep pin */
942         LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
943
944         /* Relocate records to the new bucket */
945         _hash_splitbucket(rel, metabuf,
946                                           old_bucket, new_bucket,
947                                           buf_oblkno, buf_nblkno, NULL,
948                                           maxbucket, highmask, lowmask);
949
950         /* all done, now release the locks and pins on primary buckets. */
951         _hash_relbuf(rel, buf_oblkno);
952         _hash_relbuf(rel, buf_nblkno);
953
954         return;
955
956         /* Here if decide not to split or fail to acquire old bucket lock */
957 fail:
958
959         /* We didn't write the metapage, so just drop lock */
960         LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
961 }
962
963
964 /*
965  * _hash_alloc_buckets -- allocate a new splitpoint's worth of bucket pages
966  *
967  * This does not need to initialize the new bucket pages; we'll do that as
968  * each one is used by _hash_expandtable().  But we have to extend the logical
969  * EOF to the end of the splitpoint; this keeps smgr's idea of the EOF in
970  * sync with ours, so that we don't get complaints from smgr.
971  *
972  * We do this by writing a page of zeroes at the end of the splitpoint range.
973  * We expect that the filesystem will ensure that the intervening pages read
974  * as zeroes too.  On many filesystems this "hole" will not be allocated
975  * immediately, which means that the index file may end up more fragmented
976  * than if we forced it all to be allocated now; but since we don't scan
977  * hash indexes sequentially anyway, that probably doesn't matter.
978  *
979  * XXX It's annoying that this code is executed with the metapage lock held.
980  * We need to interlock against _hash_addovflpage() adding a new overflow page
981  * concurrently, but it'd likely be better to use LockRelationForExtension
982  * for the purpose.  OTOH, adding a splitpoint is a very infrequent operation,
983  * so it may not be worth worrying about.
984  *
985  * Returns TRUE if successful, or FALSE if allocation failed due to
986  * BlockNumber overflow.
987  */
988 static bool
989 _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
990 {
991         BlockNumber lastblock;
992         char            zerobuf[BLCKSZ];
993         Page            page;
994         HashPageOpaque ovflopaque;
995
996         lastblock = firstblock + nblocks - 1;
997
998         /*
999          * Check for overflow in block number calculation; if so, we cannot extend
1000          * the index anymore.
1001          */
1002         if (lastblock < firstblock || lastblock == InvalidBlockNumber)
1003                 return false;
1004
1005         page = (Page) zerobuf;
1006
1007         /*
1008          * Initialize the page.  Just zeroing the page won't work; see
1009          * _hash_freeovflpage for similar usage.  We take care to make the special
1010          * space valid for the benefit of tools such as pageinspect.
1011          */
1012         _hash_pageinit(page, BLCKSZ);
1013
1014         ovflopaque = (HashPageOpaque) PageGetSpecialPointer(page);
1015
1016         ovflopaque->hasho_prevblkno = InvalidBlockNumber;
1017         ovflopaque->hasho_nextblkno = InvalidBlockNumber;
1018         ovflopaque->hasho_bucket = -1;
1019         ovflopaque->hasho_flag = LH_UNUSED_PAGE;
1020         ovflopaque->hasho_page_id = HASHO_PAGE_ID;
1021
1022         if (RelationNeedsWAL(rel))
1023                 log_newpage(&rel->rd_node,
1024                                         MAIN_FORKNUM,
1025                                         lastblock,
1026                                         zerobuf,
1027                                         true);
1028
1029         RelationOpenSmgr(rel);
1030         smgrextend(rel->rd_smgr, MAIN_FORKNUM, lastblock, zerobuf, false);
1031
1032         return true;
1033 }
1034
1035
1036 /*
1037  * _hash_splitbucket -- split 'obucket' into 'obucket' and 'nbucket'
1038  *
1039  * This routine is used to partition the tuples between old and new bucket and
1040  * is used to finish the incomplete split operations.  To finish the previously
1041  * interrupted split operation, the caller needs to fill htab.  If htab is set,
1042  * then we skip the movement of tuples that exists in htab, otherwise NULL
1043  * value of htab indicates movement of all the tuples that belong to the new
1044  * bucket.
1045  *
1046  * We are splitting a bucket that consists of a base bucket page and zero
1047  * or more overflow (bucket chain) pages.  We must relocate tuples that
1048  * belong in the new bucket.
1049  *
1050  * The caller must hold cleanup locks on both buckets to ensure that
1051  * no one else is trying to access them (see README).
1052  *
1053  * The caller must hold a pin, but no lock, on the metapage buffer.
1054  * The buffer is returned in the same state.  (The metapage is only
1055  * touched if it becomes necessary to add or remove overflow pages.)
1056  *
1057  * Split needs to retain pin on primary bucket pages of both old and new
1058  * buckets till end of operation.  This is to prevent vacuum from starting
1059  * while a split is in progress.
1060  *
1061  * In addition, the caller must have created the new bucket's base page,
1062  * which is passed in buffer nbuf, pinned and write-locked.  That lock and
1063  * pin are released here.  (The API is set up this way because we must do
1064  * _hash_getnewbuf() before releasing the metapage write lock.  So instead of
1065  * passing the new bucket's start block number, we pass an actual buffer.)
1066  */
1067 static void
1068 _hash_splitbucket(Relation rel,
1069                                   Buffer metabuf,
1070                                   Bucket obucket,
1071                                   Bucket nbucket,
1072                                   Buffer obuf,
1073                                   Buffer nbuf,
1074                                   HTAB *htab,
1075                                   uint32 maxbucket,
1076                                   uint32 highmask,
1077                                   uint32 lowmask)
1078 {
1079         Buffer          bucket_obuf;
1080         Buffer          bucket_nbuf;
1081         Page            opage;
1082         Page            npage;
1083         HashPageOpaque oopaque;
1084         HashPageOpaque nopaque;
1085         OffsetNumber itup_offsets[MaxIndexTuplesPerPage];
1086         IndexTuple      itups[MaxIndexTuplesPerPage];
1087         Size            all_tups_size = 0;
1088         int                     i;
1089         uint16          nitups = 0;
1090
1091         bucket_obuf = obuf;
1092         opage = BufferGetPage(obuf);
1093         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
1094
1095         bucket_nbuf = nbuf;
1096         npage = BufferGetPage(nbuf);
1097         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
1098
1099         /*
1100          * Partition the tuples in the old bucket between the old bucket and the
1101          * new bucket, advancing along the old bucket's overflow bucket chain and
1102          * adding overflow pages to the new bucket as needed.  Outer loop iterates
1103          * once per page in old bucket.
1104          */
1105         for (;;)
1106         {
1107                 BlockNumber oblkno;
1108                 OffsetNumber ooffnum;
1109                 OffsetNumber omaxoffnum;
1110
1111                 /* Scan each tuple in old page */
1112                 omaxoffnum = PageGetMaxOffsetNumber(opage);
1113                 for (ooffnum = FirstOffsetNumber;
1114                          ooffnum <= omaxoffnum;
1115                          ooffnum = OffsetNumberNext(ooffnum))
1116                 {
1117                         IndexTuple      itup;
1118                         Size            itemsz;
1119                         Bucket          bucket;
1120                         bool            found = false;
1121
1122                         /* skip dead tuples */
1123                         if (ItemIdIsDead(PageGetItemId(opage, ooffnum)))
1124                                 continue;
1125
1126                         /*
1127                          * Before inserting a tuple, probe the hash table containing TIDs
1128                          * of tuples belonging to new bucket, if we find a match, then
1129                          * skip that tuple, else fetch the item's hash key (conveniently
1130                          * stored in the item) and determine which bucket it now belongs
1131                          * in.
1132                          */
1133                         itup = (IndexTuple) PageGetItem(opage,
1134                                                                                         PageGetItemId(opage, ooffnum));
1135
1136                         if (htab)
1137                                 (void) hash_search(htab, &itup->t_tid, HASH_FIND, &found);
1138
1139                         if (found)
1140                                 continue;
1141
1142                         bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
1143                                                                                   maxbucket, highmask, lowmask);
1144
1145                         if (bucket == nbucket)
1146                         {
1147                                 IndexTuple      new_itup;
1148
1149                                 /*
1150                                  * make a copy of index tuple as we have to scribble on it.
1151                                  */
1152                                 new_itup = CopyIndexTuple(itup);
1153
1154                                 /*
1155                                  * mark the index tuple as moved by split, such tuples are
1156                                  * skipped by scan if there is split in progress for a bucket.
1157                                  */
1158                                 new_itup->t_info |= INDEX_MOVED_BY_SPLIT_MASK;
1159
1160                                 /*
1161                                  * insert the tuple into the new bucket.  if it doesn't fit on
1162                                  * the current page in the new bucket, we must allocate a new
1163                                  * overflow page and place the tuple on that page instead.
1164                                  */
1165                                 itemsz = IndexTupleDSize(*new_itup);
1166                                 itemsz = MAXALIGN(itemsz);
1167
1168                                 if (PageGetFreeSpaceForMultipleTuples(npage, nitups + 1) < (all_tups_size + itemsz))
1169                                 {
1170                                         /*
1171                                          * Change the shared buffer state in critical section,
1172                                          * otherwise any error could make it unrecoverable.
1173                                          */
1174                                         START_CRIT_SECTION();
1175
1176                                         _hash_pgaddmultitup(rel, nbuf, itups, itup_offsets, nitups);
1177                                         MarkBufferDirty(nbuf);
1178                                         /* log the split operation before releasing the lock */
1179                                         log_split_page(rel, nbuf);
1180
1181                                         END_CRIT_SECTION();
1182
1183                                         /* drop lock, but keep pin */
1184                                         LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
1185
1186                                         /* be tidy */
1187                                         for (i = 0; i < nitups; i++)
1188                                                 pfree(itups[i]);
1189                                         nitups = 0;
1190                                         all_tups_size = 0;
1191
1192                                         /* chain to a new overflow page */
1193                                         nbuf = _hash_addovflpage(rel, metabuf, nbuf, (nbuf == bucket_nbuf) ? true : false);
1194                                         npage = BufferGetPage(nbuf);
1195                                         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
1196                                 }
1197
1198                                 itups[nitups++] = new_itup;
1199                                 all_tups_size += itemsz;
1200                         }
1201                         else
1202                         {
1203                                 /*
1204                                  * the tuple stays on this page, so nothing to do.
1205                                  */
1206                                 Assert(bucket == obucket);
1207                         }
1208                 }
1209
1210                 oblkno = oopaque->hasho_nextblkno;
1211
1212                 /* retain the pin on the old primary bucket */
1213                 if (obuf == bucket_obuf)
1214                         LockBuffer(obuf, BUFFER_LOCK_UNLOCK);
1215                 else
1216                         _hash_relbuf(rel, obuf);
1217
1218                 /* Exit loop if no more overflow pages in old bucket */
1219                 if (!BlockNumberIsValid(oblkno))
1220                 {
1221                         /*
1222                          * Change the shared buffer state in critical section, otherwise
1223                          * any error could make it unrecoverable.
1224                          */
1225                         START_CRIT_SECTION();
1226
1227                         _hash_pgaddmultitup(rel, nbuf, itups, itup_offsets, nitups);
1228                         MarkBufferDirty(nbuf);
1229                         /* log the split operation before releasing the lock */
1230                         log_split_page(rel, nbuf);
1231
1232                         END_CRIT_SECTION();
1233
1234                         if (nbuf == bucket_nbuf)
1235                                 LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
1236                         else
1237                                 _hash_relbuf(rel, nbuf);
1238
1239                         /* be tidy */
1240                         for (i = 0; i < nitups; i++)
1241                                 pfree(itups[i]);
1242                         break;
1243                 }
1244
1245                 /* Else, advance to next old page */
1246                 obuf = _hash_getbuf(rel, oblkno, HASH_READ, LH_OVERFLOW_PAGE);
1247                 opage = BufferGetPage(obuf);
1248                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
1249         }
1250
1251         /*
1252          * We're at the end of the old bucket chain, so we're done partitioning
1253          * the tuples.  Mark the old and new buckets to indicate split is
1254          * finished.
1255          *
1256          * To avoid deadlocks due to locking order of buckets, first lock the old
1257          * bucket and then the new bucket.
1258          */
1259         LockBuffer(bucket_obuf, BUFFER_LOCK_EXCLUSIVE);
1260         opage = BufferGetPage(bucket_obuf);
1261         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
1262
1263         LockBuffer(bucket_nbuf, BUFFER_LOCK_EXCLUSIVE);
1264         npage = BufferGetPage(bucket_nbuf);
1265         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
1266
1267         START_CRIT_SECTION();
1268
1269         oopaque->hasho_flag &= ~LH_BUCKET_BEING_SPLIT;
1270         nopaque->hasho_flag &= ~LH_BUCKET_BEING_POPULATED;
1271
1272         /*
1273          * After the split is finished, mark the old bucket to indicate that it
1274          * contains deletable tuples.  Vacuum will clear split-cleanup flag after
1275          * deleting such tuples.
1276          */
1277         oopaque->hasho_flag |= LH_BUCKET_NEEDS_SPLIT_CLEANUP;
1278
1279         /*
1280          * now write the buffers, here we don't release the locks as caller is
1281          * responsible to release locks.
1282          */
1283         MarkBufferDirty(bucket_obuf);
1284         MarkBufferDirty(bucket_nbuf);
1285
1286         if (RelationNeedsWAL(rel))
1287         {
1288                 XLogRecPtr      recptr;
1289                 xl_hash_split_complete xlrec;
1290
1291                 xlrec.old_bucket_flag = oopaque->hasho_flag;
1292                 xlrec.new_bucket_flag = nopaque->hasho_flag;
1293
1294                 XLogBeginInsert();
1295
1296                 XLogRegisterData((char *) &xlrec, SizeOfHashSplitComplete);
1297
1298                 XLogRegisterBuffer(0, bucket_obuf, REGBUF_STANDARD);
1299                 XLogRegisterBuffer(1, bucket_nbuf, REGBUF_STANDARD);
1300
1301                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_COMPLETE);
1302
1303                 PageSetLSN(BufferGetPage(bucket_obuf), recptr);
1304                 PageSetLSN(BufferGetPage(bucket_nbuf), recptr);
1305         }
1306
1307         END_CRIT_SECTION();
1308 }
1309
1310 /*
1311  *      _hash_finish_split() -- Finish the previously interrupted split operation
1312  *
1313  * To complete the split operation, we form the hash table of TIDs in new
1314  * bucket which is then used by split operation to skip tuples that are
1315  * already moved before the split operation was previously interrupted.
1316  *
1317  * The caller must hold a pin, but no lock, on the metapage and old bucket's
1318  * primary page buffer.  The buffers are returned in the same state.  (The
1319  * metapage is only touched if it becomes necessary to add or remove overflow
1320  * pages.)
1321  */
1322 void
1323 _hash_finish_split(Relation rel, Buffer metabuf, Buffer obuf, Bucket obucket,
1324                                    uint32 maxbucket, uint32 highmask, uint32 lowmask)
1325 {
1326         HASHCTL         hash_ctl;
1327         HTAB       *tidhtab;
1328         Buffer          bucket_nbuf = InvalidBuffer;
1329         Buffer          nbuf;
1330         Page            npage;
1331         BlockNumber nblkno;
1332         BlockNumber bucket_nblkno;
1333         HashPageOpaque npageopaque;
1334         Bucket          nbucket;
1335         bool            found;
1336
1337         /* Initialize hash tables used to track TIDs */
1338         memset(&hash_ctl, 0, sizeof(hash_ctl));
1339         hash_ctl.keysize = sizeof(ItemPointerData);
1340         hash_ctl.entrysize = sizeof(ItemPointerData);
1341         hash_ctl.hcxt = CurrentMemoryContext;
1342
1343         tidhtab =
1344                 hash_create("bucket ctids",
1345                                         256,            /* arbitrary initial size */
1346                                         &hash_ctl,
1347                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1348
1349         bucket_nblkno = nblkno = _hash_get_newblock_from_oldbucket(rel, obucket);
1350
1351         /*
1352          * Scan the new bucket and build hash table of TIDs
1353          */
1354         for (;;)
1355         {
1356                 OffsetNumber noffnum;
1357                 OffsetNumber nmaxoffnum;
1358
1359                 nbuf = _hash_getbuf(rel, nblkno, HASH_READ,
1360                                                         LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
1361
1362                 /* remember the primary bucket buffer to acquire cleanup lock on it. */
1363                 if (nblkno == bucket_nblkno)
1364                         bucket_nbuf = nbuf;
1365
1366                 npage = BufferGetPage(nbuf);
1367                 npageopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
1368
1369                 /* Scan each tuple in new page */
1370                 nmaxoffnum = PageGetMaxOffsetNumber(npage);
1371                 for (noffnum = FirstOffsetNumber;
1372                          noffnum <= nmaxoffnum;
1373                          noffnum = OffsetNumberNext(noffnum))
1374                 {
1375                         IndexTuple      itup;
1376
1377                         /* Fetch the item's TID and insert it in hash table. */
1378                         itup = (IndexTuple) PageGetItem(npage,
1379                                                                                         PageGetItemId(npage, noffnum));
1380
1381                         (void) hash_search(tidhtab, &itup->t_tid, HASH_ENTER, &found);
1382
1383                         Assert(!found);
1384                 }
1385
1386                 nblkno = npageopaque->hasho_nextblkno;
1387
1388                 /*
1389                  * release our write lock without modifying buffer and ensure to
1390                  * retain the pin on primary bucket.
1391                  */
1392                 if (nbuf == bucket_nbuf)
1393                         LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
1394                 else
1395                         _hash_relbuf(rel, nbuf);
1396
1397                 /* Exit loop if no more overflow pages in new bucket */
1398                 if (!BlockNumberIsValid(nblkno))
1399                         break;
1400         }
1401
1402         /*
1403          * Conditionally get the cleanup lock on old and new buckets to perform
1404          * the split operation.  If we don't get the cleanup locks, silently give
1405          * up and next insertion on old bucket will try again to complete the
1406          * split.
1407          */
1408         if (!ConditionalLockBufferForCleanup(obuf))
1409         {
1410                 hash_destroy(tidhtab);
1411                 return;
1412         }
1413         if (!ConditionalLockBufferForCleanup(bucket_nbuf))
1414         {
1415                 LockBuffer(obuf, BUFFER_LOCK_UNLOCK);
1416                 hash_destroy(tidhtab);
1417                 return;
1418         }
1419
1420         npage = BufferGetPage(bucket_nbuf);
1421         npageopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
1422         nbucket = npageopaque->hasho_bucket;
1423
1424         _hash_splitbucket(rel, metabuf, obucket,
1425                                           nbucket, obuf, bucket_nbuf, tidhtab,
1426                                           maxbucket, highmask, lowmask);
1427
1428         _hash_relbuf(rel, bucket_nbuf);
1429         LockBuffer(obuf, BUFFER_LOCK_UNLOCK);
1430         hash_destroy(tidhtab);
1431 }
1432
1433 /*
1434  *      log_split_page() -- Log the split operation
1435  *
1436  *      We log the split operation when the new page in new bucket gets full,
1437  *      so we log the entire page.
1438  *
1439  *      'buf' must be locked by the caller which is also responsible for unlocking
1440  *      it.
1441  */
1442 static void
1443 log_split_page(Relation rel, Buffer buf)
1444 {
1445         if (RelationNeedsWAL(rel))
1446         {
1447                 XLogRecPtr      recptr;
1448
1449                 XLogBeginInsert();
1450
1451                 XLogRegisterBuffer(0, buf, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
1452
1453                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_PAGE);
1454
1455                 PageSetLSN(BufferGetPage(buf), recptr);
1456         }
1457 }
1458
1459 /*
1460  *      _hash_getcachedmetap() -- Returns cached metapage data.
1461  *
1462  *      If metabuf is not InvalidBuffer, caller must hold a pin, but no lock, on
1463  *      the metapage.  If not set, we'll set it before returning if we have to
1464  *      refresh the cache, and return with a pin but no lock on it; caller is
1465  *      responsible for releasing the pin.
1466  *
1467  *      We refresh the cache if it's not initialized yet or force_refresh is true.
1468  */
1469 HashMetaPage
1470 _hash_getcachedmetap(Relation rel, Buffer *metabuf, bool force_refresh)
1471 {
1472         Page            page;
1473
1474         Assert(metabuf);
1475         if (force_refresh || rel->rd_amcache == NULL)
1476         {
1477                 char       *cache = NULL;
1478
1479                 /*
1480                  * It's important that we don't set rd_amcache to an invalid value.
1481                  * Either MemoryContextAlloc or _hash_getbuf could fail, so don't
1482                  * install a pointer to the newly-allocated storage in the actual
1483                  * relcache entry until both have succeeeded.
1484                  */
1485                 if (rel->rd_amcache == NULL)
1486                         cache = MemoryContextAlloc(rel->rd_indexcxt,
1487                                                                            sizeof(HashMetaPageData));
1488
1489                 /* Read the metapage. */
1490                 if (BufferIsValid(*metabuf))
1491                         LockBuffer(*metabuf, BUFFER_LOCK_SHARE);
1492                 else
1493                         *metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ,
1494                                                                         LH_META_PAGE);
1495                 page = BufferGetPage(*metabuf);
1496
1497                 /* Populate the cache. */
1498                 if (rel->rd_amcache == NULL)
1499                         rel->rd_amcache = cache;
1500                 memcpy(rel->rd_amcache, HashPageGetMeta(page),
1501                            sizeof(HashMetaPageData));
1502
1503                 /* Release metapage lock, but keep the pin. */
1504                 LockBuffer(*metabuf, BUFFER_LOCK_UNLOCK);
1505         }
1506
1507         return (HashMetaPage) rel->rd_amcache;
1508 }
1509
1510 /*
1511  *      _hash_getbucketbuf_from_hashkey() -- Get the bucket's buffer for the given
1512  *                                                                               hashkey.
1513  *
1514  *      Bucket pages do not move or get removed once they are allocated. This give
1515  *      us an opportunity to use the previously saved metapage contents to reach
1516  *      the target bucket buffer, instead of reading from the metapage every time.
1517  *      This saves one buffer access every time we want to reach the target bucket
1518  *      buffer, which is very helpful savings in bufmgr traffic and contention.
1519  *
1520  *      The access type parameter (HASH_READ or HASH_WRITE) indicates whether the
1521  *      bucket buffer has to be locked for reading or writing.
1522  *
1523  *      The out parameter cachedmetap is set with metapage contents used for
1524  *      hashkey to bucket buffer mapping. Some callers need this info to reach the
1525  *      old bucket in case of bucket split, see _hash_doinsert().
1526  */
1527 Buffer
1528 _hash_getbucketbuf_from_hashkey(Relation rel, uint32 hashkey, int access,
1529                                                                 HashMetaPage *cachedmetap)
1530 {
1531         HashMetaPage metap;
1532         Buffer          buf;
1533         Buffer          metabuf = InvalidBuffer;
1534         Page            page;
1535         Bucket          bucket;
1536         BlockNumber blkno;
1537         HashPageOpaque opaque;
1538
1539         /* We read from target bucket buffer, hence locking is must. */
1540         Assert(access == HASH_READ || access == HASH_WRITE);
1541
1542         metap = _hash_getcachedmetap(rel, &metabuf, false);
1543         Assert(metap != NULL);
1544
1545         /*
1546          * Loop until we get a lock on the correct target bucket.
1547          */
1548         for (;;)
1549         {
1550                 /*
1551                  * Compute the target bucket number, and convert to block number.
1552                  */
1553                 bucket = _hash_hashkey2bucket(hashkey,
1554                                                                           metap->hashm_maxbucket,
1555                                                                           metap->hashm_highmask,
1556                                                                           metap->hashm_lowmask);
1557
1558                 blkno = BUCKET_TO_BLKNO(metap, bucket);
1559
1560                 /* Fetch the primary bucket page for the bucket */
1561                 buf = _hash_getbuf(rel, blkno, access, LH_BUCKET_PAGE);
1562                 page = BufferGetPage(buf);
1563                 opaque = (HashPageOpaque) PageGetSpecialPointer(page);
1564                 Assert(opaque->hasho_bucket == bucket);
1565                 Assert(opaque->hasho_prevblkno != InvalidBlockNumber);
1566
1567                 /*
1568                  * If this bucket hasn't been split, we're done.
1569                  */
1570                 if (opaque->hasho_prevblkno <= metap->hashm_maxbucket)
1571                         break;
1572
1573                 /* Drop lock on this buffer, update cached metapage, and retry. */
1574                 _hash_relbuf(rel, buf);
1575                 metap = _hash_getcachedmetap(rel, &metabuf, true);
1576                 Assert(metap != NULL);
1577         }
1578
1579         if (BufferIsValid(metabuf))
1580                 _hash_dropbuf(rel, metabuf);
1581
1582         if (cachedmetap)
1583                 *cachedmetap = metap;
1584
1585         return buf;
1586 }