]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hashpage.c
Flag index metapages as standard-format in xlog.c calls.
[postgresql] / src / backend / access / hash / hashpage.c
1 /*-------------------------------------------------------------------------
2  *
3  * hashpage.c
4  *        Hash table page management code for the Postgres hash access method
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/hash/hashpage.c
12  *
13  * NOTES
14  *        Postgres hash pages look like ordinary relation pages.  The opaque
15  *        data at high addresses includes information about the page including
16  *        whether a page is an overflow page or a true bucket, the bucket
17  *        number, and the block numbers of the preceding and following pages
18  *        in the same bucket.
19  *
20  *        The first page in a hash relation, page zero, is special -- it stores
21  *        information describing the hash table; it is referred to as the
22  *        "meta page." Pages one and higher store the actual data.
23  *
24  *        There are also bitmap pages, which are not manipulated here;
25  *        see hashovfl.c.
26  *
27  *-------------------------------------------------------------------------
28  */
29 #include "postgres.h"
30
31 #include "access/hash.h"
32 #include "access/hash_xlog.h"
33 #include "miscadmin.h"
34 #include "storage/lmgr.h"
35 #include "storage/smgr.h"
36
37
38 static bool _hash_alloc_buckets(Relation rel, BlockNumber firstblock,
39                                         uint32 nblocks);
40 static void _hash_splitbucket(Relation rel, Buffer metabuf,
41                                   Bucket obucket, Bucket nbucket,
42                                   Buffer obuf,
43                                   Buffer nbuf,
44                                   HTAB *htab,
45                                   uint32 maxbucket,
46                                   uint32 highmask, uint32 lowmask);
47 static void log_split_page(Relation rel, Buffer buf);
48
49
50 /*
51  * We use high-concurrency locking on hash indexes (see README for an overview
52  * of the locking rules).  However, we can skip taking lmgr locks when the
53  * index is local to the current backend (ie, either temp or new in the
54  * current transaction).  No one else can see it, so there's no reason to
55  * take locks.  We still take buffer-level locks, but not lmgr locks.
56  */
57 #define USELOCKING(rel)         (!RELATION_IS_LOCAL(rel))
58
59
60 /*
61  *      _hash_getbuf() -- Get a buffer by block number for read or write.
62  *
63  *              'access' must be HASH_READ, HASH_WRITE, or HASH_NOLOCK.
64  *              'flags' is a bitwise OR of the allowed page types.
65  *
66  *              This must be used only to fetch pages that are expected to be valid
67  *              already.  _hash_checkpage() is applied using the given flags.
68  *
69  *              When this routine returns, the appropriate lock is set on the
70  *              requested buffer and its reference count has been incremented
71  *              (ie, the buffer is "locked and pinned").
72  *
73  *              P_NEW is disallowed because this routine can only be used
74  *              to access pages that are known to be before the filesystem EOF.
75  *              Extending the index should be done with _hash_getnewbuf.
76  */
77 Buffer
78 _hash_getbuf(Relation rel, BlockNumber blkno, int access, int flags)
79 {
80         Buffer          buf;
81
82         if (blkno == P_NEW)
83                 elog(ERROR, "hash AM does not use P_NEW");
84
85         buf = ReadBuffer(rel, blkno);
86
87         if (access != HASH_NOLOCK)
88                 LockBuffer(buf, access);
89
90         /* ref count and lock type are correct */
91
92         _hash_checkpage(rel, buf, flags);
93
94         return buf;
95 }
96
97 /*
98  * _hash_getbuf_with_condlock_cleanup() -- Try to get a buffer for cleanup.
99  *
100  *              We read the page and try to acquire a cleanup lock.  If we get it,
101  *              we return the buffer; otherwise, we return InvalidBuffer.
102  */
103 Buffer
104 _hash_getbuf_with_condlock_cleanup(Relation rel, BlockNumber blkno, int flags)
105 {
106         Buffer          buf;
107
108         if (blkno == P_NEW)
109                 elog(ERROR, "hash AM does not use P_NEW");
110
111         buf = ReadBuffer(rel, blkno);
112
113         if (!ConditionalLockBufferForCleanup(buf))
114         {
115                 ReleaseBuffer(buf);
116                 return InvalidBuffer;
117         }
118
119         /* ref count and lock type are correct */
120
121         _hash_checkpage(rel, buf, flags);
122
123         return buf;
124 }
125
126 /*
127  *      _hash_getinitbuf() -- Get and initialize a buffer by block number.
128  *
129  *              This must be used only to fetch pages that are known to be before
130  *              the index's filesystem EOF, but are to be filled from scratch.
131  *              _hash_pageinit() is applied automatically.  Otherwise it has
132  *              effects similar to _hash_getbuf() with access = HASH_WRITE.
133  *
134  *              When this routine returns, a write lock is set on the
135  *              requested buffer and its reference count has been incremented
136  *              (ie, the buffer is "locked and pinned").
137  *
138  *              P_NEW is disallowed because this routine can only be used
139  *              to access pages that are known to be before the filesystem EOF.
140  *              Extending the index should be done with _hash_getnewbuf.
141  */
142 Buffer
143 _hash_getinitbuf(Relation rel, BlockNumber blkno)
144 {
145         Buffer          buf;
146
147         if (blkno == P_NEW)
148                 elog(ERROR, "hash AM does not use P_NEW");
149
150         buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK,
151                                                          NULL);
152
153         /* ref count and lock type are correct */
154
155         /* initialize the page */
156         _hash_pageinit(BufferGetPage(buf), BufferGetPageSize(buf));
157
158         return buf;
159 }
160
161 /*
162  *      _hash_initbuf() -- Get and initialize a buffer by bucket number.
163  */
164 void
165 _hash_initbuf(Buffer buf, uint32 max_bucket, uint32 num_bucket, uint32 flag,
166                           bool initpage)
167 {
168         HashPageOpaque pageopaque;
169         Page            page;
170
171         page = BufferGetPage(buf);
172
173         /* initialize the page */
174         if (initpage)
175                 _hash_pageinit(page, BufferGetPageSize(buf));
176
177         pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
178
179         /*
180          * Set hasho_prevblkno with current hashm_maxbucket. This value will be
181          * used to validate cached HashMetaPageData. See
182          * _hash_getbucketbuf_from_hashkey().
183          */
184         pageopaque->hasho_prevblkno = max_bucket;
185         pageopaque->hasho_nextblkno = InvalidBlockNumber;
186         pageopaque->hasho_bucket = num_bucket;
187         pageopaque->hasho_flag = flag;
188         pageopaque->hasho_page_id = HASHO_PAGE_ID;
189 }
190
191 /*
192  *      _hash_getnewbuf() -- Get a new page at the end of the index.
193  *
194  *              This has the same API as _hash_getinitbuf, except that we are adding
195  *              a page to the index, and hence expect the page to be past the
196  *              logical EOF.  (However, we have to support the case where it isn't,
197  *              since a prior try might have crashed after extending the filesystem
198  *              EOF but before updating the metapage to reflect the added page.)
199  *
200  *              It is caller's responsibility to ensure that only one process can
201  *              extend the index at a time.  In practice, this function is called
202  *              only while holding write lock on the metapage, because adding a page
203  *              is always associated with an update of metapage data.
204  */
205 Buffer
206 _hash_getnewbuf(Relation rel, BlockNumber blkno, ForkNumber forkNum)
207 {
208         BlockNumber nblocks = RelationGetNumberOfBlocksInFork(rel, forkNum);
209         Buffer          buf;
210
211         if (blkno == P_NEW)
212                 elog(ERROR, "hash AM does not use P_NEW");
213         if (blkno > nblocks)
214                 elog(ERROR, "access to noncontiguous page in hash index \"%s\"",
215                          RelationGetRelationName(rel));
216
217         /* smgr insists we use P_NEW to extend the relation */
218         if (blkno == nblocks)
219         {
220                 buf = ReadBufferExtended(rel, forkNum, P_NEW, RBM_NORMAL, NULL);
221                 if (BufferGetBlockNumber(buf) != blkno)
222                         elog(ERROR, "unexpected hash relation size: %u, should be %u",
223                                  BufferGetBlockNumber(buf), blkno);
224                 LockBuffer(buf, HASH_WRITE);
225         }
226         else
227         {
228                 buf = ReadBufferExtended(rel, forkNum, blkno, RBM_ZERO_AND_LOCK,
229                                                                  NULL);
230         }
231
232         /* ref count and lock type are correct */
233
234         /* initialize the page */
235         _hash_pageinit(BufferGetPage(buf), BufferGetPageSize(buf));
236
237         return buf;
238 }
239
240 /*
241  *      _hash_getbuf_with_strategy() -- Get a buffer with nondefault strategy.
242  *
243  *              This is identical to _hash_getbuf() but also allows a buffer access
244  *              strategy to be specified.  We use this for VACUUM operations.
245  */
246 Buffer
247 _hash_getbuf_with_strategy(Relation rel, BlockNumber blkno,
248                                                    int access, int flags,
249                                                    BufferAccessStrategy bstrategy)
250 {
251         Buffer          buf;
252
253         if (blkno == P_NEW)
254                 elog(ERROR, "hash AM does not use P_NEW");
255
256         buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, bstrategy);
257
258         if (access != HASH_NOLOCK)
259                 LockBuffer(buf, access);
260
261         /* ref count and lock type are correct */
262
263         _hash_checkpage(rel, buf, flags);
264
265         return buf;
266 }
267
268 /*
269  *      _hash_relbuf() -- release a locked buffer.
270  *
271  * Lock and pin (refcount) are both dropped.
272  */
273 void
274 _hash_relbuf(Relation rel, Buffer buf)
275 {
276         UnlockReleaseBuffer(buf);
277 }
278
279 /*
280  *      _hash_dropbuf() -- release an unlocked buffer.
281  *
282  * This is used to unpin a buffer on which we hold no lock.
283  */
284 void
285 _hash_dropbuf(Relation rel, Buffer buf)
286 {
287         ReleaseBuffer(buf);
288 }
289
290 /*
291  *      _hash_dropscanbuf() -- release buffers used in scan.
292  *
293  * This routine unpins the buffers used during scan on which we
294  * hold no lock.
295  */
296 void
297 _hash_dropscanbuf(Relation rel, HashScanOpaque so)
298 {
299         /* release pin we hold on primary bucket page */
300         if (BufferIsValid(so->hashso_bucket_buf) &&
301                 so->hashso_bucket_buf != so->currPos.buf)
302                 _hash_dropbuf(rel, so->hashso_bucket_buf);
303         so->hashso_bucket_buf = InvalidBuffer;
304
305         /* release pin we hold on primary bucket page  of bucket being split */
306         if (BufferIsValid(so->hashso_split_bucket_buf) &&
307                 so->hashso_split_bucket_buf != so->currPos.buf)
308                 _hash_dropbuf(rel, so->hashso_split_bucket_buf);
309         so->hashso_split_bucket_buf = InvalidBuffer;
310
311         /* release any pin we still hold */
312         if (BufferIsValid(so->currPos.buf))
313                 _hash_dropbuf(rel, so->currPos.buf);
314         so->currPos.buf = InvalidBuffer;
315
316         /* reset split scan */
317         so->hashso_buc_populated = false;
318         so->hashso_buc_split = false;
319 }
320
321
322 /*
323  *      _hash_init() -- Initialize the metadata page of a hash index,
324  *                              the initial buckets, and the initial bitmap page.
325  *
326  * The initial number of buckets is dependent on num_tuples, an estimate
327  * of the number of tuples to be loaded into the index initially.  The
328  * chosen number of buckets is returned.
329  *
330  * We are fairly cavalier about locking here, since we know that no one else
331  * could be accessing this index.  In particular the rule about not holding
332  * multiple buffer locks is ignored.
333  */
334 uint32
335 _hash_init(Relation rel, double num_tuples, ForkNumber forkNum)
336 {
337         Buffer          metabuf;
338         Buffer          buf;
339         Buffer          bitmapbuf;
340         Page            pg;
341         HashMetaPage metap;
342         RegProcedure procid;
343         int32           data_width;
344         int32           item_width;
345         int32           ffactor;
346         uint32          num_buckets;
347         uint32          i;
348         bool            use_wal;
349
350         /* safety check */
351         if (RelationGetNumberOfBlocksInFork(rel, forkNum) != 0)
352                 elog(ERROR, "cannot initialize non-empty hash index \"%s\"",
353                          RelationGetRelationName(rel));
354
355         /*
356          * WAL log creation of pages if the relation is persistent, or this is the
357          * init fork.  Init forks for unlogged relations always need to be WAL
358          * logged.
359          */
360         use_wal = RelationNeedsWAL(rel) || forkNum == INIT_FORKNUM;
361
362         /*
363          * Determine the target fill factor (in tuples per bucket) for this index.
364          * The idea is to make the fill factor correspond to pages about as full
365          * as the user-settable fillfactor parameter says.  We can compute it
366          * exactly since the index datatype (i.e. uint32 hash key) is fixed-width.
367          */
368         data_width = sizeof(uint32);
369         item_width = MAXALIGN(sizeof(IndexTupleData)) + MAXALIGN(data_width) +
370                 sizeof(ItemIdData);             /* include the line pointer */
371         ffactor = RelationGetTargetPageUsage(rel, HASH_DEFAULT_FILLFACTOR) / item_width;
372         /* keep to a sane range */
373         if (ffactor < 10)
374                 ffactor = 10;
375
376         procid = index_getprocid(rel, 1, HASHSTANDARD_PROC);
377
378         /*
379          * We initialize the metapage, the first N bucket pages, and the first
380          * bitmap page in sequence, using _hash_getnewbuf to cause smgrextend()
381          * calls to occur.  This ensures that the smgr level has the right idea of
382          * the physical index length.
383          *
384          * Critical section not required, because on error the creation of the
385          * whole relation will be rolled back.
386          */
387         metabuf = _hash_getnewbuf(rel, HASH_METAPAGE, forkNum);
388         _hash_init_metabuffer(metabuf, num_tuples, procid, ffactor, false);
389         MarkBufferDirty(metabuf);
390
391         pg = BufferGetPage(metabuf);
392         metap = HashPageGetMeta(pg);
393
394         /* XLOG stuff */
395         if (use_wal)
396         {
397                 xl_hash_init_meta_page xlrec;
398                 XLogRecPtr      recptr;
399
400                 xlrec.num_tuples = num_tuples;
401                 xlrec.procid = metap->hashm_procid;
402                 xlrec.ffactor = metap->hashm_ffactor;
403
404                 XLogBeginInsert();
405                 XLogRegisterData((char *) &xlrec, SizeOfHashInitMetaPage);
406                 XLogRegisterBuffer(0, metabuf, REGBUF_WILL_INIT | REGBUF_STANDARD);
407
408                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_META_PAGE);
409
410                 PageSetLSN(BufferGetPage(metabuf), recptr);
411         }
412
413         num_buckets = metap->hashm_maxbucket + 1;
414
415         /*
416          * Release buffer lock on the metapage while we initialize buckets.
417          * Otherwise, we'll be in interrupt holdoff and the CHECK_FOR_INTERRUPTS
418          * won't accomplish anything.  It's a bad idea to hold buffer locks for
419          * long intervals in any case, since that can block the bgwriter.
420          */
421         LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
422
423         /*
424          * Initialize and WAL Log the first N buckets
425          */
426         for (i = 0; i < num_buckets; i++)
427         {
428                 BlockNumber blkno;
429
430                 /* Allow interrupts, in case N is huge */
431                 CHECK_FOR_INTERRUPTS();
432
433                 blkno = BUCKET_TO_BLKNO(metap, i);
434                 buf = _hash_getnewbuf(rel, blkno, forkNum);
435                 _hash_initbuf(buf, metap->hashm_maxbucket, i, LH_BUCKET_PAGE, false);
436                 MarkBufferDirty(buf);
437
438                 if (use_wal)
439                         log_newpage(&rel->rd_node,
440                                                 forkNum,
441                                                 blkno,
442                                                 BufferGetPage(buf),
443                                                 true);
444                 _hash_relbuf(rel, buf);
445         }
446
447         /* Now reacquire buffer lock on metapage */
448         LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
449
450         /*
451          * Initialize bitmap page
452          */
453         bitmapbuf = _hash_getnewbuf(rel, num_buckets + 1, forkNum);
454         _hash_initbitmapbuffer(bitmapbuf, metap->hashm_bmsize, false);
455         MarkBufferDirty(bitmapbuf);
456
457         /* add the new bitmap page to the metapage's list of bitmaps */
458         /* metapage already has a write lock */
459         if (metap->hashm_nmaps >= HASH_MAX_BITMAPS)
460                 ereport(ERROR,
461                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
462                                  errmsg("out of overflow pages in hash index \"%s\"",
463                                                 RelationGetRelationName(rel))));
464
465         metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
466
467         metap->hashm_nmaps++;
468         MarkBufferDirty(metabuf);
469
470         /* XLOG stuff */
471         if (use_wal)
472         {
473                 xl_hash_init_bitmap_page xlrec;
474                 XLogRecPtr      recptr;
475
476                 xlrec.bmsize = metap->hashm_bmsize;
477
478                 XLogBeginInsert();
479                 XLogRegisterData((char *) &xlrec, SizeOfHashInitBitmapPage);
480                 XLogRegisterBuffer(0, bitmapbuf, REGBUF_WILL_INIT);
481
482                 /*
483                  * This is safe only because nobody else can be modifying the index at
484                  * this stage; it's only visible to the transaction that is creating
485                  * it.
486                  */
487                 XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
488
489                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_BITMAP_PAGE);
490
491                 PageSetLSN(BufferGetPage(bitmapbuf), recptr);
492                 PageSetLSN(BufferGetPage(metabuf), recptr);
493         }
494
495         /* all done */
496         _hash_relbuf(rel, bitmapbuf);
497         _hash_relbuf(rel, metabuf);
498
499         return num_buckets;
500 }
501
502 /*
503  *      _hash_init_metabuffer() -- Initialize the metadata page of a hash index.
504  */
505 void
506 _hash_init_metabuffer(Buffer buf, double num_tuples, RegProcedure procid,
507                                           uint16 ffactor, bool initpage)
508 {
509         HashMetaPage metap;
510         HashPageOpaque pageopaque;
511         Page            page;
512         double          dnumbuckets;
513         uint32          num_buckets;
514         uint32          spare_index;
515         uint32          i;
516
517         /*
518          * Choose the number of initial bucket pages to match the fill factor
519          * given the estimated number of tuples.  We round up the result to the
520          * total number of buckets which has to be allocated before using its
521          * _hashm_spare element. However always force at least 2 bucket pages. The
522          * upper limit is determined by considerations explained in
523          * _hash_expandtable().
524          */
525         dnumbuckets = num_tuples / ffactor;
526         if (dnumbuckets <= 2.0)
527                 num_buckets = 2;
528         else if (dnumbuckets >= (double) 0x40000000)
529                 num_buckets = 0x40000000;
530         else
531                 num_buckets = _hash_get_totalbuckets(_hash_spareindex(dnumbuckets));
532
533         spare_index = _hash_spareindex(num_buckets);
534         Assert(spare_index < HASH_MAX_SPLITPOINTS);
535
536         page = BufferGetPage(buf);
537         if (initpage)
538                 _hash_pageinit(page, BufferGetPageSize(buf));
539
540         pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
541         pageopaque->hasho_prevblkno = InvalidBlockNumber;
542         pageopaque->hasho_nextblkno = InvalidBlockNumber;
543         pageopaque->hasho_bucket = -1;
544         pageopaque->hasho_flag = LH_META_PAGE;
545         pageopaque->hasho_page_id = HASHO_PAGE_ID;
546
547         metap = HashPageGetMeta(page);
548
549         metap->hashm_magic = HASH_MAGIC;
550         metap->hashm_version = HASH_VERSION;
551         metap->hashm_ntuples = 0;
552         metap->hashm_nmaps = 0;
553         metap->hashm_ffactor = ffactor;
554         metap->hashm_bsize = HashGetMaxBitmapSize(page);
555         /* find largest bitmap array size that will fit in page size */
556         for (i = _hash_log2(metap->hashm_bsize); i > 0; --i)
557         {
558                 if ((1 << i) <= metap->hashm_bsize)
559                         break;
560         }
561         Assert(i > 0);
562         metap->hashm_bmsize = 1 << i;
563         metap->hashm_bmshift = i + BYTE_TO_BIT;
564         Assert((1 << BMPG_SHIFT(metap)) == (BMPG_MASK(metap) + 1));
565
566         /*
567          * Label the index with its primary hash support function's OID.  This is
568          * pretty useless for normal operation (in fact, hashm_procid is not used
569          * anywhere), but it might be handy for forensic purposes so we keep it.
570          */
571         metap->hashm_procid = procid;
572
573         /*
574          * We initialize the index with N buckets, 0 .. N-1, occupying physical
575          * blocks 1 to N.  The first freespace bitmap page is in block N+1.
576          */
577         metap->hashm_maxbucket = num_buckets - 1;
578
579         /*
580          * Set highmask as next immediate ((2 ^ x) - 1), which should be
581          * sufficient to cover num_buckets.
582          */
583         metap->hashm_highmask = (1 << (_hash_log2(num_buckets + 1))) - 1;
584         metap->hashm_lowmask = (metap->hashm_highmask >> 1);
585
586         MemSet(metap->hashm_spares, 0, sizeof(metap->hashm_spares));
587         MemSet(metap->hashm_mapp, 0, sizeof(metap->hashm_mapp));
588
589         /* Set up mapping for one spare page after the initial splitpoints */
590         metap->hashm_spares[spare_index] = 1;
591         metap->hashm_ovflpoint = spare_index;
592         metap->hashm_firstfree = 0;
593
594         /*
595          * Set pd_lower just past the end of the metadata.  This is essential,
596          * because without doing so, metadata will be lost if xlog.c compresses
597          * the page.
598          */
599         ((PageHeader) page)->pd_lower =
600                 ((char *) metap + sizeof(HashMetaPageData)) - (char *) page;
601 }
602
603 /*
604  *      _hash_pageinit() -- Initialize a new hash index page.
605  */
606 void
607 _hash_pageinit(Page page, Size size)
608 {
609         PageInit(page, size, sizeof(HashPageOpaqueData));
610 }
611
612 /*
613  * Attempt to expand the hash table by creating one new bucket.
614  *
615  * This will silently do nothing if we don't get cleanup lock on old or
616  * new bucket.
617  *
618  * Complete the pending splits and remove the tuples from old bucket,
619  * if there are any left over from the previous split.
620  *
621  * The caller must hold a pin, but no lock, on the metapage buffer.
622  * The buffer is returned in the same state.
623  */
624 void
625 _hash_expandtable(Relation rel, Buffer metabuf)
626 {
627         HashMetaPage metap;
628         Bucket          old_bucket;
629         Bucket          new_bucket;
630         uint32          spare_ndx;
631         BlockNumber start_oblkno;
632         BlockNumber start_nblkno;
633         Buffer          buf_nblkno;
634         Buffer          buf_oblkno;
635         Page            opage;
636         Page            npage;
637         HashPageOpaque oopaque;
638         HashPageOpaque nopaque;
639         uint32          maxbucket;
640         uint32          highmask;
641         uint32          lowmask;
642         bool            metap_update_masks = false;
643         bool            metap_update_splitpoint = false;
644
645 restart_expand:
646
647         /*
648          * Write-lock the meta page.  It used to be necessary to acquire a
649          * heavyweight lock to begin a split, but that is no longer required.
650          */
651         LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
652
653         _hash_checkpage(rel, metabuf, LH_META_PAGE);
654         metap = HashPageGetMeta(BufferGetPage(metabuf));
655
656         /*
657          * Check to see if split is still needed; someone else might have already
658          * done one while we waited for the lock.
659          *
660          * Make sure this stays in sync with _hash_doinsert()
661          */
662         if (metap->hashm_ntuples <=
663                 (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1))
664                 goto fail;
665
666         /*
667          * Can't split anymore if maxbucket has reached its maximum possible
668          * value.
669          *
670          * Ideally we'd allow bucket numbers up to UINT_MAX-1 (no higher because
671          * the calculation maxbucket+1 mustn't overflow).  Currently we restrict
672          * to half that because of overflow looping in _hash_log2() and
673          * insufficient space in hashm_spares[].  It's moot anyway because an
674          * index with 2^32 buckets would certainly overflow BlockNumber and hence
675          * _hash_alloc_buckets() would fail, but if we supported buckets smaller
676          * than a disk block then this would be an independent constraint.
677          *
678          * If you change this, see also the maximum initial number of buckets in
679          * _hash_init().
680          */
681         if (metap->hashm_maxbucket >= (uint32) 0x7FFFFFFE)
682                 goto fail;
683
684         /*
685          * Determine which bucket is to be split, and attempt to take cleanup lock
686          * on the old bucket.  If we can't get the lock, give up.
687          *
688          * The cleanup lock protects us not only against other backends, but
689          * against our own backend as well.
690          *
691          * The cleanup lock is mainly to protect the split from concurrent
692          * inserts. See src/backend/access/hash/README, Lock Definitions for
693          * further details.  Due to this locking restriction, if there is any
694          * pending scan, the split will give up which is not good, but harmless.
695          */
696         new_bucket = metap->hashm_maxbucket + 1;
697
698         old_bucket = (new_bucket & metap->hashm_lowmask);
699
700         start_oblkno = BUCKET_TO_BLKNO(metap, old_bucket);
701
702         buf_oblkno = _hash_getbuf_with_condlock_cleanup(rel, start_oblkno, LH_BUCKET_PAGE);
703         if (!buf_oblkno)
704                 goto fail;
705
706         opage = BufferGetPage(buf_oblkno);
707         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
708
709         /*
710          * We want to finish the split from a bucket as there is no apparent
711          * benefit by not doing so and it will make the code complicated to finish
712          * the split that involves multiple buckets considering the case where new
713          * split also fails.  We don't need to consider the new bucket for
714          * completing the split here as it is not possible that a re-split of new
715          * bucket starts when there is still a pending split from old bucket.
716          */
717         if (H_BUCKET_BEING_SPLIT(oopaque))
718         {
719                 /*
720                  * Copy bucket mapping info now; refer the comment in code below where
721                  * we copy this information before calling _hash_splitbucket to see
722                  * why this is okay.
723                  */
724                 maxbucket = metap->hashm_maxbucket;
725                 highmask = metap->hashm_highmask;
726                 lowmask = metap->hashm_lowmask;
727
728                 /*
729                  * Release the lock on metapage and old_bucket, before completing the
730                  * split.
731                  */
732                 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
733                 LockBuffer(buf_oblkno, BUFFER_LOCK_UNLOCK);
734
735                 _hash_finish_split(rel, metabuf, buf_oblkno, old_bucket, maxbucket,
736                                                    highmask, lowmask);
737
738                 /* release the pin on old buffer and retry for expand. */
739                 _hash_dropbuf(rel, buf_oblkno);
740
741                 goto restart_expand;
742         }
743
744         /*
745          * Clean the tuples remained from the previous split.  This operation
746          * requires cleanup lock and we already have one on the old bucket, so
747          * let's do it. We also don't want to allow further splits from the bucket
748          * till the garbage of previous split is cleaned.  This has two
749          * advantages; first, it helps in avoiding the bloat due to garbage and
750          * second is, during cleanup of bucket, we are always sure that the
751          * garbage tuples belong to most recently split bucket.  On the contrary,
752          * if we allow cleanup of bucket after meta page is updated to indicate
753          * the new split and before the actual split, the cleanup operation won't
754          * be able to decide whether the tuple has been moved to the newly created
755          * bucket and ended up deleting such tuples.
756          */
757         if (H_NEEDS_SPLIT_CLEANUP(oopaque))
758         {
759                 /*
760                  * Copy bucket mapping info now; refer to the comment in code below
761                  * where we copy this information before calling _hash_splitbucket to
762                  * see why this is okay.
763                  */
764                 maxbucket = metap->hashm_maxbucket;
765                 highmask = metap->hashm_highmask;
766                 lowmask = metap->hashm_lowmask;
767
768                 /* Release the metapage lock. */
769                 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
770
771                 hashbucketcleanup(rel, old_bucket, buf_oblkno, start_oblkno, NULL,
772                                                   maxbucket, highmask, lowmask, NULL, NULL, true,
773                                                   NULL, NULL);
774
775                 _hash_dropbuf(rel, buf_oblkno);
776
777                 goto restart_expand;
778         }
779
780         /*
781          * There shouldn't be any active scan on new bucket.
782          *
783          * Note: it is safe to compute the new bucket's blkno here, even though we
784          * may still need to update the BUCKET_TO_BLKNO mapping.  This is because
785          * the current value of hashm_spares[hashm_ovflpoint] correctly shows
786          * where we are going to put a new splitpoint's worth of buckets.
787          */
788         start_nblkno = BUCKET_TO_BLKNO(metap, new_bucket);
789
790         /*
791          * If the split point is increasing we need to allocate a new batch of
792          * bucket pages.
793          */
794         spare_ndx = _hash_spareindex(new_bucket + 1);
795         if (spare_ndx > metap->hashm_ovflpoint)
796         {
797                 uint32          buckets_to_add;
798
799                 Assert(spare_ndx == metap->hashm_ovflpoint + 1);
800
801                 /*
802                  * We treat allocation of buckets as a separate WAL-logged action.
803                  * Even if we fail after this operation, won't leak bucket pages;
804                  * rather, the next split will consume this space. In any case, even
805                  * without failure we don't use all the space in one split operation.
806                  */
807                 buckets_to_add = _hash_get_totalbuckets(spare_ndx) - new_bucket;
808                 if (!_hash_alloc_buckets(rel, start_nblkno, buckets_to_add))
809                 {
810                         /* can't split due to BlockNumber overflow */
811                         _hash_relbuf(rel, buf_oblkno);
812                         goto fail;
813                 }
814         }
815
816         /*
817          * Physically allocate the new bucket's primary page.  We want to do this
818          * before changing the metapage's mapping info, in case we can't get the
819          * disk space.  Ideally, we don't need to check for cleanup lock on new
820          * bucket as no other backend could find this bucket unless meta page is
821          * updated.  However, it is good to be consistent with old bucket locking.
822          */
823         buf_nblkno = _hash_getnewbuf(rel, start_nblkno, MAIN_FORKNUM);
824         if (!IsBufferCleanupOK(buf_nblkno))
825         {
826                 _hash_relbuf(rel, buf_oblkno);
827                 _hash_relbuf(rel, buf_nblkno);
828                 goto fail;
829         }
830
831         /*
832          * Since we are scribbling on the pages in the shared buffers, establish a
833          * critical section.  Any failure in this next code leaves us with a big
834          * problem: the metapage is effectively corrupt but could get written back
835          * to disk.
836          */
837         START_CRIT_SECTION();
838
839         /*
840          * Okay to proceed with split.  Update the metapage bucket mapping info.
841          */
842         metap->hashm_maxbucket = new_bucket;
843
844         if (new_bucket > metap->hashm_highmask)
845         {
846                 /* Starting a new doubling */
847                 metap->hashm_lowmask = metap->hashm_highmask;
848                 metap->hashm_highmask = new_bucket | metap->hashm_lowmask;
849                 metap_update_masks = true;
850         }
851
852         /*
853          * If the split point is increasing we need to adjust the hashm_spares[]
854          * array and hashm_ovflpoint so that future overflow pages will be created
855          * beyond this new batch of bucket pages.
856          */
857         if (spare_ndx > metap->hashm_ovflpoint)
858         {
859                 metap->hashm_spares[spare_ndx] = metap->hashm_spares[metap->hashm_ovflpoint];
860                 metap->hashm_ovflpoint = spare_ndx;
861                 metap_update_splitpoint = true;
862         }
863
864         MarkBufferDirty(metabuf);
865
866         /*
867          * Copy bucket mapping info now; this saves re-accessing the meta page
868          * inside _hash_splitbucket's inner loop.  Note that once we drop the
869          * split lock, other splits could begin, so these values might be out of
870          * date before _hash_splitbucket finishes.  That's okay, since all it
871          * needs is to tell which of these two buckets to map hashkeys into.
872          */
873         maxbucket = metap->hashm_maxbucket;
874         highmask = metap->hashm_highmask;
875         lowmask = metap->hashm_lowmask;
876
877         opage = BufferGetPage(buf_oblkno);
878         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
879
880         /*
881          * Mark the old bucket to indicate that split is in progress.  (At
882          * operation end, we will clear the split-in-progress flag.)  Also, for a
883          * primary bucket page, hasho_prevblkno stores the number of buckets that
884          * existed as of the last split, so we must update that value here.
885          */
886         oopaque->hasho_flag |= LH_BUCKET_BEING_SPLIT;
887         oopaque->hasho_prevblkno = maxbucket;
888
889         MarkBufferDirty(buf_oblkno);
890
891         npage = BufferGetPage(buf_nblkno);
892
893         /*
894          * initialize the new bucket's primary page and mark it to indicate that
895          * split is in progress.
896          */
897         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
898         nopaque->hasho_prevblkno = maxbucket;
899         nopaque->hasho_nextblkno = InvalidBlockNumber;
900         nopaque->hasho_bucket = new_bucket;
901         nopaque->hasho_flag = LH_BUCKET_PAGE | LH_BUCKET_BEING_POPULATED;
902         nopaque->hasho_page_id = HASHO_PAGE_ID;
903
904         MarkBufferDirty(buf_nblkno);
905
906         /* XLOG stuff */
907         if (RelationNeedsWAL(rel))
908         {
909                 xl_hash_split_allocate_page xlrec;
910                 XLogRecPtr      recptr;
911
912                 xlrec.new_bucket = maxbucket;
913                 xlrec.old_bucket_flag = oopaque->hasho_flag;
914                 xlrec.new_bucket_flag = nopaque->hasho_flag;
915                 xlrec.flags = 0;
916
917                 XLogBeginInsert();
918
919                 XLogRegisterBuffer(0, buf_oblkno, REGBUF_STANDARD);
920                 XLogRegisterBuffer(1, buf_nblkno, REGBUF_WILL_INIT);
921                 XLogRegisterBuffer(2, metabuf, REGBUF_STANDARD);
922
923                 if (metap_update_masks)
924                 {
925                         xlrec.flags |= XLH_SPLIT_META_UPDATE_MASKS;
926                         XLogRegisterBufData(2, (char *) &metap->hashm_lowmask, sizeof(uint32));
927                         XLogRegisterBufData(2, (char *) &metap->hashm_highmask, sizeof(uint32));
928                 }
929
930                 if (metap_update_splitpoint)
931                 {
932                         xlrec.flags |= XLH_SPLIT_META_UPDATE_SPLITPOINT;
933                         XLogRegisterBufData(2, (char *) &metap->hashm_ovflpoint,
934                                                                 sizeof(uint32));
935                         XLogRegisterBufData(2,
936                                                                 (char *) &metap->hashm_spares[metap->hashm_ovflpoint],
937                                                                 sizeof(uint32));
938                 }
939
940                 XLogRegisterData((char *) &xlrec, SizeOfHashSplitAllocPage);
941
942                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_ALLOCATE_PAGE);
943
944                 PageSetLSN(BufferGetPage(buf_oblkno), recptr);
945                 PageSetLSN(BufferGetPage(buf_nblkno), recptr);
946                 PageSetLSN(BufferGetPage(metabuf), recptr);
947         }
948
949         END_CRIT_SECTION();
950
951         /* drop lock, but keep pin */
952         LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
953
954         /* Relocate records to the new bucket */
955         _hash_splitbucket(rel, metabuf,
956                                           old_bucket, new_bucket,
957                                           buf_oblkno, buf_nblkno, NULL,
958                                           maxbucket, highmask, lowmask);
959
960         /* all done, now release the pins on primary buckets. */
961         _hash_dropbuf(rel, buf_oblkno);
962         _hash_dropbuf(rel, buf_nblkno);
963
964         return;
965
966         /* Here if decide not to split or fail to acquire old bucket lock */
967 fail:
968
969         /* We didn't write the metapage, so just drop lock */
970         LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
971 }
972
973
974 /*
975  * _hash_alloc_buckets -- allocate a new splitpoint's worth of bucket pages
976  *
977  * This does not need to initialize the new bucket pages; we'll do that as
978  * each one is used by _hash_expandtable().  But we have to extend the logical
979  * EOF to the end of the splitpoint; this keeps smgr's idea of the EOF in
980  * sync with ours, so that we don't get complaints from smgr.
981  *
982  * We do this by writing a page of zeroes at the end of the splitpoint range.
983  * We expect that the filesystem will ensure that the intervening pages read
984  * as zeroes too.  On many filesystems this "hole" will not be allocated
985  * immediately, which means that the index file may end up more fragmented
986  * than if we forced it all to be allocated now; but since we don't scan
987  * hash indexes sequentially anyway, that probably doesn't matter.
988  *
989  * XXX It's annoying that this code is executed with the metapage lock held.
990  * We need to interlock against _hash_addovflpage() adding a new overflow page
991  * concurrently, but it'd likely be better to use LockRelationForExtension
992  * for the purpose.  OTOH, adding a splitpoint is a very infrequent operation,
993  * so it may not be worth worrying about.
994  *
995  * Returns TRUE if successful, or FALSE if allocation failed due to
996  * BlockNumber overflow.
997  */
998 static bool
999 _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
1000 {
1001         BlockNumber lastblock;
1002         char            zerobuf[BLCKSZ];
1003         Page            page;
1004         HashPageOpaque ovflopaque;
1005
1006         lastblock = firstblock + nblocks - 1;
1007
1008         /*
1009          * Check for overflow in block number calculation; if so, we cannot extend
1010          * the index anymore.
1011          */
1012         if (lastblock < firstblock || lastblock == InvalidBlockNumber)
1013                 return false;
1014
1015         page = (Page) zerobuf;
1016
1017         /*
1018          * Initialize the page.  Just zeroing the page won't work; see
1019          * _hash_freeovflpage for similar usage.  We take care to make the special
1020          * space valid for the benefit of tools such as pageinspect.
1021          */
1022         _hash_pageinit(page, BLCKSZ);
1023
1024         ovflopaque = (HashPageOpaque) PageGetSpecialPointer(page);
1025
1026         ovflopaque->hasho_prevblkno = InvalidBlockNumber;
1027         ovflopaque->hasho_nextblkno = InvalidBlockNumber;
1028         ovflopaque->hasho_bucket = -1;
1029         ovflopaque->hasho_flag = LH_UNUSED_PAGE;
1030         ovflopaque->hasho_page_id = HASHO_PAGE_ID;
1031
1032         if (RelationNeedsWAL(rel))
1033                 log_newpage(&rel->rd_node,
1034                                         MAIN_FORKNUM,
1035                                         lastblock,
1036                                         zerobuf,
1037                                         true);
1038
1039         RelationOpenSmgr(rel);
1040         smgrextend(rel->rd_smgr, MAIN_FORKNUM, lastblock, zerobuf, false);
1041
1042         return true;
1043 }
1044
1045
1046 /*
1047  * _hash_splitbucket -- split 'obucket' into 'obucket' and 'nbucket'
1048  *
1049  * This routine is used to partition the tuples between old and new bucket and
1050  * is used to finish the incomplete split operations.  To finish the previously
1051  * interrupted split operation, the caller needs to fill htab.  If htab is set,
1052  * then we skip the movement of tuples that exists in htab, otherwise NULL
1053  * value of htab indicates movement of all the tuples that belong to the new
1054  * bucket.
1055  *
1056  * We are splitting a bucket that consists of a base bucket page and zero
1057  * or more overflow (bucket chain) pages.  We must relocate tuples that
1058  * belong in the new bucket.
1059  *
1060  * The caller must hold cleanup locks on both buckets to ensure that
1061  * no one else is trying to access them (see README).
1062  *
1063  * The caller must hold a pin, but no lock, on the metapage buffer.
1064  * The buffer is returned in the same state.  (The metapage is only
1065  * touched if it becomes necessary to add or remove overflow pages.)
1066  *
1067  * Split needs to retain pin on primary bucket pages of both old and new
1068  * buckets till end of operation.  This is to prevent vacuum from starting
1069  * while a split is in progress.
1070  *
1071  * In addition, the caller must have created the new bucket's base page,
1072  * which is passed in buffer nbuf, pinned and write-locked.  The lock will be
1073  * released here and pin must be released by the caller.  (The API is set up
1074  * this way because we must do _hash_getnewbuf() before releasing the metapage
1075  * write lock.  So instead of passing the new bucket's start block number, we
1076  * pass an actual buffer.)
1077  */
1078 static void
1079 _hash_splitbucket(Relation rel,
1080                                   Buffer metabuf,
1081                                   Bucket obucket,
1082                                   Bucket nbucket,
1083                                   Buffer obuf,
1084                                   Buffer nbuf,
1085                                   HTAB *htab,
1086                                   uint32 maxbucket,
1087                                   uint32 highmask,
1088                                   uint32 lowmask)
1089 {
1090         Buffer          bucket_obuf;
1091         Buffer          bucket_nbuf;
1092         Page            opage;
1093         Page            npage;
1094         HashPageOpaque oopaque;
1095         HashPageOpaque nopaque;
1096         OffsetNumber itup_offsets[MaxIndexTuplesPerPage];
1097         IndexTuple      itups[MaxIndexTuplesPerPage];
1098         Size            all_tups_size = 0;
1099         int                     i;
1100         uint16          nitups = 0;
1101
1102         bucket_obuf = obuf;
1103         opage = BufferGetPage(obuf);
1104         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
1105
1106         bucket_nbuf = nbuf;
1107         npage = BufferGetPage(nbuf);
1108         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
1109
1110         /*
1111          * Partition the tuples in the old bucket between the old bucket and the
1112          * new bucket, advancing along the old bucket's overflow bucket chain and
1113          * adding overflow pages to the new bucket as needed.  Outer loop iterates
1114          * once per page in old bucket.
1115          */
1116         for (;;)
1117         {
1118                 BlockNumber oblkno;
1119                 OffsetNumber ooffnum;
1120                 OffsetNumber omaxoffnum;
1121
1122                 /* Scan each tuple in old page */
1123                 omaxoffnum = PageGetMaxOffsetNumber(opage);
1124                 for (ooffnum = FirstOffsetNumber;
1125                          ooffnum <= omaxoffnum;
1126                          ooffnum = OffsetNumberNext(ooffnum))
1127                 {
1128                         IndexTuple      itup;
1129                         Size            itemsz;
1130                         Bucket          bucket;
1131                         bool            found = false;
1132
1133                         /* skip dead tuples */
1134                         if (ItemIdIsDead(PageGetItemId(opage, ooffnum)))
1135                                 continue;
1136
1137                         /*
1138                          * Before inserting a tuple, probe the hash table containing TIDs
1139                          * of tuples belonging to new bucket, if we find a match, then
1140                          * skip that tuple, else fetch the item's hash key (conveniently
1141                          * stored in the item) and determine which bucket it now belongs
1142                          * in.
1143                          */
1144                         itup = (IndexTuple) PageGetItem(opage,
1145                                                                                         PageGetItemId(opage, ooffnum));
1146
1147                         if (htab)
1148                                 (void) hash_search(htab, &itup->t_tid, HASH_FIND, &found);
1149
1150                         if (found)
1151                                 continue;
1152
1153                         bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
1154                                                                                   maxbucket, highmask, lowmask);
1155
1156                         if (bucket == nbucket)
1157                         {
1158                                 IndexTuple      new_itup;
1159
1160                                 /*
1161                                  * make a copy of index tuple as we have to scribble on it.
1162                                  */
1163                                 new_itup = CopyIndexTuple(itup);
1164
1165                                 /*
1166                                  * mark the index tuple as moved by split, such tuples are
1167                                  * skipped by scan if there is split in progress for a bucket.
1168                                  */
1169                                 new_itup->t_info |= INDEX_MOVED_BY_SPLIT_MASK;
1170
1171                                 /*
1172                                  * insert the tuple into the new bucket.  if it doesn't fit on
1173                                  * the current page in the new bucket, we must allocate a new
1174                                  * overflow page and place the tuple on that page instead.
1175                                  */
1176                                 itemsz = IndexTupleDSize(*new_itup);
1177                                 itemsz = MAXALIGN(itemsz);
1178
1179                                 if (PageGetFreeSpaceForMultipleTuples(npage, nitups + 1) < (all_tups_size + itemsz))
1180                                 {
1181                                         /*
1182                                          * Change the shared buffer state in critical section,
1183                                          * otherwise any error could make it unrecoverable.
1184                                          */
1185                                         START_CRIT_SECTION();
1186
1187                                         _hash_pgaddmultitup(rel, nbuf, itups, itup_offsets, nitups);
1188                                         MarkBufferDirty(nbuf);
1189                                         /* log the split operation before releasing the lock */
1190                                         log_split_page(rel, nbuf);
1191
1192                                         END_CRIT_SECTION();
1193
1194                                         /* drop lock, but keep pin */
1195                                         LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
1196
1197                                         /* be tidy */
1198                                         for (i = 0; i < nitups; i++)
1199                                                 pfree(itups[i]);
1200                                         nitups = 0;
1201                                         all_tups_size = 0;
1202
1203                                         /* chain to a new overflow page */
1204                                         nbuf = _hash_addovflpage(rel, metabuf, nbuf, (nbuf == bucket_nbuf) ? true : false);
1205                                         npage = BufferGetPage(nbuf);
1206                                         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
1207                                 }
1208
1209                                 itups[nitups++] = new_itup;
1210                                 all_tups_size += itemsz;
1211                         }
1212                         else
1213                         {
1214                                 /*
1215                                  * the tuple stays on this page, so nothing to do.
1216                                  */
1217                                 Assert(bucket == obucket);
1218                         }
1219                 }
1220
1221                 oblkno = oopaque->hasho_nextblkno;
1222
1223                 /* retain the pin on the old primary bucket */
1224                 if (obuf == bucket_obuf)
1225                         LockBuffer(obuf, BUFFER_LOCK_UNLOCK);
1226                 else
1227                         _hash_relbuf(rel, obuf);
1228
1229                 /* Exit loop if no more overflow pages in old bucket */
1230                 if (!BlockNumberIsValid(oblkno))
1231                 {
1232                         /*
1233                          * Change the shared buffer state in critical section, otherwise
1234                          * any error could make it unrecoverable.
1235                          */
1236                         START_CRIT_SECTION();
1237
1238                         _hash_pgaddmultitup(rel, nbuf, itups, itup_offsets, nitups);
1239                         MarkBufferDirty(nbuf);
1240                         /* log the split operation before releasing the lock */
1241                         log_split_page(rel, nbuf);
1242
1243                         END_CRIT_SECTION();
1244
1245                         if (nbuf == bucket_nbuf)
1246                                 LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
1247                         else
1248                                 _hash_relbuf(rel, nbuf);
1249
1250                         /* be tidy */
1251                         for (i = 0; i < nitups; i++)
1252                                 pfree(itups[i]);
1253                         break;
1254                 }
1255
1256                 /* Else, advance to next old page */
1257                 obuf = _hash_getbuf(rel, oblkno, HASH_READ, LH_OVERFLOW_PAGE);
1258                 opage = BufferGetPage(obuf);
1259                 oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
1260         }
1261
1262         /*
1263          * We're at the end of the old bucket chain, so we're done partitioning
1264          * the tuples.  Mark the old and new buckets to indicate split is
1265          * finished.
1266          *
1267          * To avoid deadlocks due to locking order of buckets, first lock the old
1268          * bucket and then the new bucket.
1269          */
1270         LockBuffer(bucket_obuf, BUFFER_LOCK_EXCLUSIVE);
1271         opage = BufferGetPage(bucket_obuf);
1272         oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
1273
1274         LockBuffer(bucket_nbuf, BUFFER_LOCK_EXCLUSIVE);
1275         npage = BufferGetPage(bucket_nbuf);
1276         nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
1277
1278         START_CRIT_SECTION();
1279
1280         oopaque->hasho_flag &= ~LH_BUCKET_BEING_SPLIT;
1281         nopaque->hasho_flag &= ~LH_BUCKET_BEING_POPULATED;
1282
1283         /*
1284          * After the split is finished, mark the old bucket to indicate that it
1285          * contains deletable tuples.  We will clear split-cleanup flag after
1286          * deleting such tuples either at the end of split or at the next split
1287          * from old bucket or at the time of vacuum.
1288          */
1289         oopaque->hasho_flag |= LH_BUCKET_NEEDS_SPLIT_CLEANUP;
1290
1291         /*
1292          * now write the buffers, here we don't release the locks as caller is
1293          * responsible to release locks.
1294          */
1295         MarkBufferDirty(bucket_obuf);
1296         MarkBufferDirty(bucket_nbuf);
1297
1298         if (RelationNeedsWAL(rel))
1299         {
1300                 XLogRecPtr      recptr;
1301                 xl_hash_split_complete xlrec;
1302
1303                 xlrec.old_bucket_flag = oopaque->hasho_flag;
1304                 xlrec.new_bucket_flag = nopaque->hasho_flag;
1305
1306                 XLogBeginInsert();
1307
1308                 XLogRegisterData((char *) &xlrec, SizeOfHashSplitComplete);
1309
1310                 XLogRegisterBuffer(0, bucket_obuf, REGBUF_STANDARD);
1311                 XLogRegisterBuffer(1, bucket_nbuf, REGBUF_STANDARD);
1312
1313                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_COMPLETE);
1314
1315                 PageSetLSN(BufferGetPage(bucket_obuf), recptr);
1316                 PageSetLSN(BufferGetPage(bucket_nbuf), recptr);
1317         }
1318
1319         END_CRIT_SECTION();
1320
1321         /*
1322          * If possible, clean up the old bucket.  We might not be able to do this
1323          * if someone else has a pin on it, but if not then we can go ahead.  This
1324          * isn't absolutely necessary, but it reduces bloat; if we don't do it
1325          * now, VACUUM will do it eventually, but maybe not until new overflow
1326          * pages have been allocated.  Note that there's no need to clean up the
1327          * new bucket.
1328          */
1329         if (IsBufferCleanupOK(bucket_obuf))
1330         {
1331                 LockBuffer(bucket_nbuf, BUFFER_LOCK_UNLOCK);
1332                 hashbucketcleanup(rel, obucket, bucket_obuf,
1333                                                   BufferGetBlockNumber(bucket_obuf), NULL,
1334                                                   maxbucket, highmask, lowmask, NULL, NULL, true,
1335                                                   NULL, NULL);
1336         }
1337         else
1338         {
1339                 LockBuffer(bucket_nbuf, BUFFER_LOCK_UNLOCK);
1340                 LockBuffer(bucket_obuf, BUFFER_LOCK_UNLOCK);
1341         }
1342 }
1343
1344 /*
1345  *      _hash_finish_split() -- Finish the previously interrupted split operation
1346  *
1347  * To complete the split operation, we form the hash table of TIDs in new
1348  * bucket which is then used by split operation to skip tuples that are
1349  * already moved before the split operation was previously interrupted.
1350  *
1351  * The caller must hold a pin, but no lock, on the metapage and old bucket's
1352  * primary page buffer.  The buffers are returned in the same state.  (The
1353  * metapage is only touched if it becomes necessary to add or remove overflow
1354  * pages.)
1355  */
1356 void
1357 _hash_finish_split(Relation rel, Buffer metabuf, Buffer obuf, Bucket obucket,
1358                                    uint32 maxbucket, uint32 highmask, uint32 lowmask)
1359 {
1360         HASHCTL         hash_ctl;
1361         HTAB       *tidhtab;
1362         Buffer          bucket_nbuf = InvalidBuffer;
1363         Buffer          nbuf;
1364         Page            npage;
1365         BlockNumber nblkno;
1366         BlockNumber bucket_nblkno;
1367         HashPageOpaque npageopaque;
1368         Bucket          nbucket;
1369         bool            found;
1370
1371         /* Initialize hash tables used to track TIDs */
1372         memset(&hash_ctl, 0, sizeof(hash_ctl));
1373         hash_ctl.keysize = sizeof(ItemPointerData);
1374         hash_ctl.entrysize = sizeof(ItemPointerData);
1375         hash_ctl.hcxt = CurrentMemoryContext;
1376
1377         tidhtab =
1378                 hash_create("bucket ctids",
1379                                         256,            /* arbitrary initial size */
1380                                         &hash_ctl,
1381                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1382
1383         bucket_nblkno = nblkno = _hash_get_newblock_from_oldbucket(rel, obucket);
1384
1385         /*
1386          * Scan the new bucket and build hash table of TIDs
1387          */
1388         for (;;)
1389         {
1390                 OffsetNumber noffnum;
1391                 OffsetNumber nmaxoffnum;
1392
1393                 nbuf = _hash_getbuf(rel, nblkno, HASH_READ,
1394                                                         LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
1395
1396                 /* remember the primary bucket buffer to acquire cleanup lock on it. */
1397                 if (nblkno == bucket_nblkno)
1398                         bucket_nbuf = nbuf;
1399
1400                 npage = BufferGetPage(nbuf);
1401                 npageopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
1402
1403                 /* Scan each tuple in new page */
1404                 nmaxoffnum = PageGetMaxOffsetNumber(npage);
1405                 for (noffnum = FirstOffsetNumber;
1406                          noffnum <= nmaxoffnum;
1407                          noffnum = OffsetNumberNext(noffnum))
1408                 {
1409                         IndexTuple      itup;
1410
1411                         /* Fetch the item's TID and insert it in hash table. */
1412                         itup = (IndexTuple) PageGetItem(npage,
1413                                                                                         PageGetItemId(npage, noffnum));
1414
1415                         (void) hash_search(tidhtab, &itup->t_tid, HASH_ENTER, &found);
1416
1417                         Assert(!found);
1418                 }
1419
1420                 nblkno = npageopaque->hasho_nextblkno;
1421
1422                 /*
1423                  * release our write lock without modifying buffer and ensure to
1424                  * retain the pin on primary bucket.
1425                  */
1426                 if (nbuf == bucket_nbuf)
1427                         LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
1428                 else
1429                         _hash_relbuf(rel, nbuf);
1430
1431                 /* Exit loop if no more overflow pages in new bucket */
1432                 if (!BlockNumberIsValid(nblkno))
1433                         break;
1434         }
1435
1436         /*
1437          * Conditionally get the cleanup lock on old and new buckets to perform
1438          * the split operation.  If we don't get the cleanup locks, silently give
1439          * up and next insertion on old bucket will try again to complete the
1440          * split.
1441          */
1442         if (!ConditionalLockBufferForCleanup(obuf))
1443         {
1444                 hash_destroy(tidhtab);
1445                 return;
1446         }
1447         if (!ConditionalLockBufferForCleanup(bucket_nbuf))
1448         {
1449                 LockBuffer(obuf, BUFFER_LOCK_UNLOCK);
1450                 hash_destroy(tidhtab);
1451                 return;
1452         }
1453
1454         npage = BufferGetPage(bucket_nbuf);
1455         npageopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
1456         nbucket = npageopaque->hasho_bucket;
1457
1458         _hash_splitbucket(rel, metabuf, obucket,
1459                                           nbucket, obuf, bucket_nbuf, tidhtab,
1460                                           maxbucket, highmask, lowmask);
1461
1462         _hash_dropbuf(rel, bucket_nbuf);
1463         hash_destroy(tidhtab);
1464 }
1465
1466 /*
1467  *      log_split_page() -- Log the split operation
1468  *
1469  *      We log the split operation when the new page in new bucket gets full,
1470  *      so we log the entire page.
1471  *
1472  *      'buf' must be locked by the caller which is also responsible for unlocking
1473  *      it.
1474  */
1475 static void
1476 log_split_page(Relation rel, Buffer buf)
1477 {
1478         if (RelationNeedsWAL(rel))
1479         {
1480                 XLogRecPtr      recptr;
1481
1482                 XLogBeginInsert();
1483
1484                 XLogRegisterBuffer(0, buf, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
1485
1486                 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_PAGE);
1487
1488                 PageSetLSN(BufferGetPage(buf), recptr);
1489         }
1490 }
1491
1492 /*
1493  *      _hash_getcachedmetap() -- Returns cached metapage data.
1494  *
1495  *      If metabuf is not InvalidBuffer, caller must hold a pin, but no lock, on
1496  *      the metapage.  If not set, we'll set it before returning if we have to
1497  *      refresh the cache, and return with a pin but no lock on it; caller is
1498  *      responsible for releasing the pin.
1499  *
1500  *      We refresh the cache if it's not initialized yet or force_refresh is true.
1501  */
1502 HashMetaPage
1503 _hash_getcachedmetap(Relation rel, Buffer *metabuf, bool force_refresh)
1504 {
1505         Page            page;
1506
1507         Assert(metabuf);
1508         if (force_refresh || rel->rd_amcache == NULL)
1509         {
1510                 char       *cache = NULL;
1511
1512                 /*
1513                  * It's important that we don't set rd_amcache to an invalid value.
1514                  * Either MemoryContextAlloc or _hash_getbuf could fail, so don't
1515                  * install a pointer to the newly-allocated storage in the actual
1516                  * relcache entry until both have succeeeded.
1517                  */
1518                 if (rel->rd_amcache == NULL)
1519                         cache = MemoryContextAlloc(rel->rd_indexcxt,
1520                                                                            sizeof(HashMetaPageData));
1521
1522                 /* Read the metapage. */
1523                 if (BufferIsValid(*metabuf))
1524                         LockBuffer(*metabuf, BUFFER_LOCK_SHARE);
1525                 else
1526                         *metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ,
1527                                                                         LH_META_PAGE);
1528                 page = BufferGetPage(*metabuf);
1529
1530                 /* Populate the cache. */
1531                 if (rel->rd_amcache == NULL)
1532                         rel->rd_amcache = cache;
1533                 memcpy(rel->rd_amcache, HashPageGetMeta(page),
1534                            sizeof(HashMetaPageData));
1535
1536                 /* Release metapage lock, but keep the pin. */
1537                 LockBuffer(*metabuf, BUFFER_LOCK_UNLOCK);
1538         }
1539
1540         return (HashMetaPage) rel->rd_amcache;
1541 }
1542
1543 /*
1544  *      _hash_getbucketbuf_from_hashkey() -- Get the bucket's buffer for the given
1545  *                                                                               hashkey.
1546  *
1547  *      Bucket pages do not move or get removed once they are allocated. This give
1548  *      us an opportunity to use the previously saved metapage contents to reach
1549  *      the target bucket buffer, instead of reading from the metapage every time.
1550  *      This saves one buffer access every time we want to reach the target bucket
1551  *      buffer, which is very helpful savings in bufmgr traffic and contention.
1552  *
1553  *      The access type parameter (HASH_READ or HASH_WRITE) indicates whether the
1554  *      bucket buffer has to be locked for reading or writing.
1555  *
1556  *      The out parameter cachedmetap is set with metapage contents used for
1557  *      hashkey to bucket buffer mapping. Some callers need this info to reach the
1558  *      old bucket in case of bucket split, see _hash_doinsert().
1559  */
1560 Buffer
1561 _hash_getbucketbuf_from_hashkey(Relation rel, uint32 hashkey, int access,
1562                                                                 HashMetaPage *cachedmetap)
1563 {
1564         HashMetaPage metap;
1565         Buffer          buf;
1566         Buffer          metabuf = InvalidBuffer;
1567         Page            page;
1568         Bucket          bucket;
1569         BlockNumber blkno;
1570         HashPageOpaque opaque;
1571
1572         /* We read from target bucket buffer, hence locking is must. */
1573         Assert(access == HASH_READ || access == HASH_WRITE);
1574
1575         metap = _hash_getcachedmetap(rel, &metabuf, false);
1576         Assert(metap != NULL);
1577
1578         /*
1579          * Loop until we get a lock on the correct target bucket.
1580          */
1581         for (;;)
1582         {
1583                 /*
1584                  * Compute the target bucket number, and convert to block number.
1585                  */
1586                 bucket = _hash_hashkey2bucket(hashkey,
1587                                                                           metap->hashm_maxbucket,
1588                                                                           metap->hashm_highmask,
1589                                                                           metap->hashm_lowmask);
1590
1591                 blkno = BUCKET_TO_BLKNO(metap, bucket);
1592
1593                 /* Fetch the primary bucket page for the bucket */
1594                 buf = _hash_getbuf(rel, blkno, access, LH_BUCKET_PAGE);
1595                 page = BufferGetPage(buf);
1596                 opaque = (HashPageOpaque) PageGetSpecialPointer(page);
1597                 Assert(opaque->hasho_bucket == bucket);
1598                 Assert(opaque->hasho_prevblkno != InvalidBlockNumber);
1599
1600                 /*
1601                  * If this bucket hasn't been split, we're done.
1602                  */
1603                 if (opaque->hasho_prevblkno <= metap->hashm_maxbucket)
1604                         break;
1605
1606                 /* Drop lock on this buffer, update cached metapage, and retry. */
1607                 _hash_relbuf(rel, buf);
1608                 metap = _hash_getcachedmetap(rel, &metabuf, true);
1609                 Assert(metap != NULL);
1610         }
1611
1612         if (BufferIsValid(metabuf))
1613                 _hash_dropbuf(rel, metabuf);
1614
1615         if (cachedmetap)
1616                 *cachedmetap = metap;
1617
1618         return buf;
1619 }