]> granicus.if.org Git - postgresql/blob - src/backend/access/hash/hash_xlog.c
hash: Add write-ahead logging support.
[postgresql] / src / backend / access / hash / hash_xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * hash_xlog.c
4  *        WAL replay logic for hash index.
5  *
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        src/backend/access/hash/hash_xlog.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/hash.h"
18 #include "access/hash_xlog.h"
19 #include "access/xlogutils.h"
20
21 /*
22  * replay a hash index meta page
23  */
24 static void
25 hash_xlog_init_meta_page(XLogReaderState *record)
26 {
27         XLogRecPtr      lsn = record->EndRecPtr;
28         Page            page;
29         Buffer          metabuf;
30
31         xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) XLogRecGetData(record);
32
33         /* create the index' metapage */
34         metabuf = XLogInitBufferForRedo(record, 0);
35         Assert(BufferIsValid(metabuf));
36         _hash_init_metabuffer(metabuf, xlrec->num_tuples, xlrec->procid,
37                                                   xlrec->ffactor, true);
38         page = (Page) BufferGetPage(metabuf);
39         PageSetLSN(page, lsn);
40         MarkBufferDirty(metabuf);
41         /* all done */
42         UnlockReleaseBuffer(metabuf);
43 }
44
45 /*
46  * replay a hash index bitmap page
47  */
48 static void
49 hash_xlog_init_bitmap_page(XLogReaderState *record)
50 {
51         XLogRecPtr      lsn = record->EndRecPtr;
52         Buffer          bitmapbuf;
53         Buffer          metabuf;
54         Page            page;
55         HashMetaPage metap;
56         uint32          num_buckets;
57
58         xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) XLogRecGetData(record);
59
60         /*
61          * Initialize bitmap page
62          */
63         bitmapbuf = XLogInitBufferForRedo(record, 0);
64         _hash_initbitmapbuffer(bitmapbuf, xlrec->bmsize, true);
65         PageSetLSN(BufferGetPage(bitmapbuf), lsn);
66         MarkBufferDirty(bitmapbuf);
67         UnlockReleaseBuffer(bitmapbuf);
68
69         /* add the new bitmap page to the metapage's list of bitmaps */
70         if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
71         {
72                 /*
73                  * Note: in normal operation, we'd update the metapage while still
74                  * holding lock on the bitmap page.  But during replay it's not
75                  * necessary to hold that lock, since nobody can see it yet; the
76                  * creating transaction hasn't yet committed.
77                  */
78                 page = BufferGetPage(metabuf);
79                 metap = HashPageGetMeta(page);
80
81                 num_buckets = metap->hashm_maxbucket + 1;
82                 metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
83                 metap->hashm_nmaps++;
84
85                 PageSetLSN(page, lsn);
86                 MarkBufferDirty(metabuf);
87         }
88         if (BufferIsValid(metabuf))
89                 UnlockReleaseBuffer(metabuf);
90 }
91
92 /*
93  * replay a hash index insert without split
94  */
95 static void
96 hash_xlog_insert(XLogReaderState *record)
97 {
98         HashMetaPage metap;
99         XLogRecPtr      lsn = record->EndRecPtr;
100         xl_hash_insert *xlrec = (xl_hash_insert *) XLogRecGetData(record);
101         Buffer          buffer;
102         Page            page;
103
104         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
105         {
106                 Size            datalen;
107                 char       *datapos = XLogRecGetBlockData(record, 0, &datalen);
108
109                 page = BufferGetPage(buffer);
110
111                 if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
112                                                 false, false) == InvalidOffsetNumber)
113                         elog(PANIC, "hash_xlog_insert: failed to add item");
114
115                 PageSetLSN(page, lsn);
116                 MarkBufferDirty(buffer);
117         }
118         if (BufferIsValid(buffer))
119                 UnlockReleaseBuffer(buffer);
120
121         if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
122         {
123                 /*
124                  * Note: in normal operation, we'd update the metapage while still
125                  * holding lock on the page we inserted into.  But during replay it's
126                  * not necessary to hold that lock, since no other index updates can
127                  * be happening concurrently.
128                  */
129                 page = BufferGetPage(buffer);
130                 metap = HashPageGetMeta(page);
131                 metap->hashm_ntuples += 1;
132
133                 PageSetLSN(page, lsn);
134                 MarkBufferDirty(buffer);
135         }
136         if (BufferIsValid(buffer))
137                 UnlockReleaseBuffer(buffer);
138 }
139
140 /*
141  * replay addition of overflow page for hash index
142  */
143 static void
144 hash_xlog_add_ovfl_page(XLogReaderState *record)
145 {
146         XLogRecPtr      lsn = record->EndRecPtr;
147         xl_hash_add_ovfl_page *xlrec = (xl_hash_add_ovfl_page *) XLogRecGetData(record);
148         Buffer          leftbuf;
149         Buffer          ovflbuf;
150         Buffer          metabuf;
151         BlockNumber leftblk;
152         BlockNumber rightblk;
153         BlockNumber newmapblk = InvalidBlockNumber;
154         Page            ovflpage;
155         HashPageOpaque ovflopaque;
156         uint32     *num_bucket;
157         char       *data;
158         Size datalen PG_USED_FOR_ASSERTS_ONLY;
159         bool            new_bmpage = false;
160
161         XLogRecGetBlockTag(record, 0, NULL, NULL, &rightblk);
162         XLogRecGetBlockTag(record, 1, NULL, NULL, &leftblk);
163
164         ovflbuf = XLogInitBufferForRedo(record, 0);
165         Assert(BufferIsValid(ovflbuf));
166
167         data = XLogRecGetBlockData(record, 0, &datalen);
168         num_bucket = (uint32 *) data;
169         Assert(datalen == sizeof(uint32));
170         _hash_initbuf(ovflbuf, InvalidBlockNumber, *num_bucket, LH_OVERFLOW_PAGE,
171                                   true);
172         /* update backlink */
173         ovflpage = BufferGetPage(ovflbuf);
174         ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
175         ovflopaque->hasho_prevblkno = leftblk;
176
177         PageSetLSN(ovflpage, lsn);
178         MarkBufferDirty(ovflbuf);
179
180         if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
181         {
182                 Page            leftpage;
183                 HashPageOpaque leftopaque;
184
185                 leftpage = BufferGetPage(leftbuf);
186                 leftopaque = (HashPageOpaque) PageGetSpecialPointer(leftpage);
187                 leftopaque->hasho_nextblkno = rightblk;
188
189                 PageSetLSN(leftpage, lsn);
190                 MarkBufferDirty(leftbuf);
191         }
192
193         if (BufferIsValid(leftbuf))
194                 UnlockReleaseBuffer(leftbuf);
195         UnlockReleaseBuffer(ovflbuf);
196
197         /*
198          * Note: in normal operation, we'd update the bitmap and meta page while
199          * still holding lock on the overflow pages.  But during replay it's not
200          * necessary to hold those locks, since no other index updates can be
201          * happening concurrently.
202          */
203         if (XLogRecHasBlockRef(record, 2))
204         {
205                 Buffer          mapbuffer;
206
207                 if (XLogReadBufferForRedo(record, 2, &mapbuffer) == BLK_NEEDS_REDO)
208                 {
209                         Page            mappage = (Page) BufferGetPage(mapbuffer);
210                         uint32     *freep = NULL;
211                         char       *data;
212                         uint32     *bitmap_page_bit;
213
214                         freep = HashPageGetBitmap(mappage);
215
216                         data = XLogRecGetBlockData(record, 2, &datalen);
217                         bitmap_page_bit = (uint32 *) data;
218
219                         SETBIT(freep, *bitmap_page_bit);
220
221                         PageSetLSN(mappage, lsn);
222                         MarkBufferDirty(mapbuffer);
223                 }
224                 if (BufferIsValid(mapbuffer))
225                         UnlockReleaseBuffer(mapbuffer);
226         }
227
228         if (XLogRecHasBlockRef(record, 3))
229         {
230                 Buffer          newmapbuf;
231
232                 newmapbuf = XLogInitBufferForRedo(record, 3);
233
234                 _hash_initbitmapbuffer(newmapbuf, xlrec->bmsize, true);
235
236                 new_bmpage = true;
237                 newmapblk = BufferGetBlockNumber(newmapbuf);
238
239                 MarkBufferDirty(newmapbuf);
240                 PageSetLSN(BufferGetPage(newmapbuf), lsn);
241
242                 UnlockReleaseBuffer(newmapbuf);
243         }
244
245         if (XLogReadBufferForRedo(record, 4, &metabuf) == BLK_NEEDS_REDO)
246         {
247                 HashMetaPage metap;
248                 Page            page;
249                 uint32     *firstfree_ovflpage;
250
251                 data = XLogRecGetBlockData(record, 4, &datalen);
252                 firstfree_ovflpage = (uint32 *) data;
253
254                 page = BufferGetPage(metabuf);
255                 metap = HashPageGetMeta(page);
256                 metap->hashm_firstfree = *firstfree_ovflpage;
257
258                 if (!xlrec->bmpage_found)
259                 {
260                         metap->hashm_spares[metap->hashm_ovflpoint]++;
261
262                         if (new_bmpage)
263                         {
264                                 Assert(BlockNumberIsValid(newmapblk));
265
266                                 metap->hashm_mapp[metap->hashm_nmaps] = newmapblk;
267                                 metap->hashm_nmaps++;
268                                 metap->hashm_spares[metap->hashm_ovflpoint]++;
269                         }
270                 }
271
272                 PageSetLSN(page, lsn);
273                 MarkBufferDirty(metabuf);
274         }
275         if (BufferIsValid(metabuf))
276                 UnlockReleaseBuffer(metabuf);
277 }
278
279 /*
280  * replay allocation of page for split operation
281  */
282 static void
283 hash_xlog_split_allocate_page(XLogReaderState *record)
284 {
285         XLogRecPtr      lsn = record->EndRecPtr;
286         xl_hash_split_allocate_page *xlrec = (xl_hash_split_allocate_page *) XLogRecGetData(record);
287         Buffer          oldbuf;
288         Buffer          newbuf;
289         Buffer          metabuf;
290         Size datalen PG_USED_FOR_ASSERTS_ONLY;
291         char       *data;
292         XLogRedoAction action;
293
294         /*
295          * To be consistent with normal operation, here we take cleanup locks on
296          * both the old and new buckets even though there can't be any concurrent
297          * inserts.
298          */
299
300         /* replay the record for old bucket */
301         action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &oldbuf);
302
303         /*
304          * Note that we still update the page even if it was restored from a full
305          * page image, because the special space is not included in the image.
306          */
307         if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
308         {
309                 Page            oldpage;
310                 HashPageOpaque oldopaque;
311
312                 oldpage = BufferGetPage(oldbuf);
313                 oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
314
315                 oldopaque->hasho_flag = xlrec->old_bucket_flag;
316                 oldopaque->hasho_prevblkno = xlrec->new_bucket;
317
318                 PageSetLSN(oldpage, lsn);
319                 MarkBufferDirty(oldbuf);
320         }
321
322         /* replay the record for new bucket */
323         newbuf = XLogInitBufferForRedo(record, 1);
324         _hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket,
325                                   xlrec->new_bucket_flag, true);
326         if (!IsBufferCleanupOK(newbuf))
327                 elog(PANIC, "hash_xlog_split_allocate_page: failed to acquire cleanup lock");
328         MarkBufferDirty(newbuf);
329         PageSetLSN(BufferGetPage(newbuf), lsn);
330
331         /*
332          * We can release the lock on old bucket early as well but doing here to
333          * consistent with normal operation.
334          */
335         if (BufferIsValid(oldbuf))
336                 UnlockReleaseBuffer(oldbuf);
337         if (BufferIsValid(newbuf))
338                 UnlockReleaseBuffer(newbuf);
339
340         /*
341          * Note: in normal operation, we'd update the meta page while still
342          * holding lock on the old and new bucket pages.  But during replay it's
343          * not necessary to hold those locks, since no other bucket splits can be
344          * happening concurrently.
345          */
346
347         /* replay the record for metapage changes */
348         if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
349         {
350                 Page            page;
351                 HashMetaPage metap;
352
353                 page = BufferGetPage(metabuf);
354                 metap = HashPageGetMeta(page);
355                 metap->hashm_maxbucket = xlrec->new_bucket;
356
357                 data = XLogRecGetBlockData(record, 2, &datalen);
358
359                 if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS)
360                 {
361                         uint32          lowmask;
362                         uint32     *highmask;
363
364                         /* extract low and high masks. */
365                         memcpy(&lowmask, data, sizeof(uint32));
366                         highmask = (uint32 *) ((char *) data + sizeof(uint32));
367
368                         /* update metapage */
369                         metap->hashm_lowmask = lowmask;
370                         metap->hashm_highmask = *highmask;
371
372                         data += sizeof(uint32) * 2;
373                 }
374
375                 if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT)
376                 {
377                         uint32          ovflpoint;
378                         uint32     *ovflpages;
379
380                         /* extract information of overflow pages. */
381                         memcpy(&ovflpoint, data, sizeof(uint32));
382                         ovflpages = (uint32 *) ((char *) data + sizeof(uint32));
383
384                         /* update metapage */
385                         metap->hashm_spares[ovflpoint] = *ovflpages;
386                         metap->hashm_ovflpoint = ovflpoint;
387                 }
388
389                 MarkBufferDirty(metabuf);
390                 PageSetLSN(BufferGetPage(metabuf), lsn);
391         }
392
393         if (BufferIsValid(metabuf))
394                 UnlockReleaseBuffer(metabuf);
395 }
396
397 /*
398  * replay of split operation
399  */
400 static void
401 hash_xlog_split_page(XLogReaderState *record)
402 {
403         Buffer          buf;
404
405         if (XLogReadBufferForRedo(record, 0, &buf) != BLK_RESTORED)
406                 elog(ERROR, "Hash split record did not contain a full-page image");
407
408         UnlockReleaseBuffer(buf);
409 }
410
411 /*
412  * replay completion of split operation
413  */
414 static void
415 hash_xlog_split_complete(XLogReaderState *record)
416 {
417         XLogRecPtr      lsn = record->EndRecPtr;
418         xl_hash_split_complete *xlrec = (xl_hash_split_complete *) XLogRecGetData(record);
419         Buffer          oldbuf;
420         Buffer          newbuf;
421         XLogRedoAction action;
422
423         /* replay the record for old bucket */
424         action = XLogReadBufferForRedo(record, 0, &oldbuf);
425
426         /*
427          * Note that we still update the page even if it was restored from a full
428          * page image, because the bucket flag is not included in the image.
429          */
430         if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
431         {
432                 Page            oldpage;
433                 HashPageOpaque oldopaque;
434
435                 oldpage = BufferGetPage(oldbuf);
436                 oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
437
438                 oldopaque->hasho_flag = xlrec->old_bucket_flag;
439
440                 PageSetLSN(oldpage, lsn);
441                 MarkBufferDirty(oldbuf);
442         }
443         if (BufferIsValid(oldbuf))
444                 UnlockReleaseBuffer(oldbuf);
445
446         /* replay the record for new bucket */
447         action = XLogReadBufferForRedo(record, 1, &newbuf);
448
449         /*
450          * Note that we still update the page even if it was restored from a full
451          * page image, because the bucket flag is not included in the image.
452          */
453         if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
454         {
455                 Page            newpage;
456                 HashPageOpaque nopaque;
457
458                 newpage = BufferGetPage(newbuf);
459                 nopaque = (HashPageOpaque) PageGetSpecialPointer(newpage);
460
461                 nopaque->hasho_flag = xlrec->new_bucket_flag;
462
463                 PageSetLSN(newpage, lsn);
464                 MarkBufferDirty(newbuf);
465         }
466         if (BufferIsValid(newbuf))
467                 UnlockReleaseBuffer(newbuf);
468 }
469
470 /*
471  * replay move of page contents for squeeze operation of hash index
472  */
473 static void
474 hash_xlog_move_page_contents(XLogReaderState *record)
475 {
476         XLogRecPtr      lsn = record->EndRecPtr;
477         xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record);
478         Buffer          bucketbuf = InvalidBuffer;
479         Buffer          writebuf = InvalidBuffer;
480         Buffer          deletebuf = InvalidBuffer;
481         XLogRedoAction action;
482
483         /*
484          * Ensure we have a cleanup lock on primary bucket page before we start
485          * with the actual replay operation.  This is to ensure that neither a
486          * scan can start nor a scan can be already-in-progress during the replay
487          * of this operation.  If we allow scans during this operation, then they
488          * can miss some records or show the same record multiple times.
489          */
490         if (xldata->is_prim_bucket_same_wrt)
491                 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
492         else
493         {
494                 /*
495                  * we don't care for return value as the purpose of reading bucketbuf
496                  * is to ensure a cleanup lock on primary bucket page.
497                  */
498                 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
499
500                 action = XLogReadBufferForRedo(record, 1, &writebuf);
501         }
502
503         /* replay the record for adding entries in overflow buffer */
504         if (action == BLK_NEEDS_REDO)
505         {
506                 Page            writepage;
507                 char       *begin;
508                 char       *data;
509                 Size            datalen;
510                 uint16          ninserted = 0;
511
512                 data = begin = XLogRecGetBlockData(record, 1, &datalen);
513
514                 writepage = (Page) BufferGetPage(writebuf);
515
516                 if (xldata->ntups > 0)
517                 {
518                         OffsetNumber *towrite = (OffsetNumber *) data;
519
520                         data += sizeof(OffsetNumber) * xldata->ntups;
521
522                         while (data - begin < datalen)
523                         {
524                                 IndexTuple      itup = (IndexTuple) data;
525                                 Size            itemsz;
526                                 OffsetNumber l;
527
528                                 itemsz = IndexTupleDSize(*itup);
529                                 itemsz = MAXALIGN(itemsz);
530
531                                 data += itemsz;
532
533                                 l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
534                                 if (l == InvalidOffsetNumber)
535                                         elog(ERROR, "hash_xlog_move_page_contents: failed to add item to hash index page, size %d bytes",
536                                                  (int) itemsz);
537
538                                 ninserted++;
539                         }
540                 }
541
542                 /*
543                  * number of tuples inserted must be same as requested in REDO record.
544                  */
545                 Assert(ninserted == xldata->ntups);
546
547                 PageSetLSN(writepage, lsn);
548                 MarkBufferDirty(writebuf);
549         }
550
551         /* replay the record for deleting entries from overflow buffer */
552         if (XLogReadBufferForRedo(record, 2, &deletebuf) == BLK_NEEDS_REDO)
553         {
554                 Page            page;
555                 char       *ptr;
556                 Size            len;
557
558                 ptr = XLogRecGetBlockData(record, 2, &len);
559
560                 page = (Page) BufferGetPage(deletebuf);
561
562                 if (len > 0)
563                 {
564                         OffsetNumber *unused;
565                         OffsetNumber *unend;
566
567                         unused = (OffsetNumber *) ptr;
568                         unend = (OffsetNumber *) ((char *) ptr + len);
569
570                         if ((unend - unused) > 0)
571                                 PageIndexMultiDelete(page, unused, unend - unused);
572                 }
573
574                 PageSetLSN(page, lsn);
575                 MarkBufferDirty(deletebuf);
576         }
577
578         /*
579          * Replay is complete, now we can release the buffers. We release locks at
580          * end of replay operation to ensure that we hold lock on primary bucket
581          * page till end of operation.  We can optimize by releasing the lock on
582          * write buffer as soon as the operation for same is complete, if it is
583          * not same as primary bucket page, but that doesn't seem to be worth
584          * complicating the code.
585          */
586         if (BufferIsValid(deletebuf))
587                 UnlockReleaseBuffer(deletebuf);
588
589         if (BufferIsValid(writebuf))
590                 UnlockReleaseBuffer(writebuf);
591
592         if (BufferIsValid(bucketbuf))
593                 UnlockReleaseBuffer(bucketbuf);
594 }
595
596 /*
597  * replay squeeze page operation of hash index
598  */
599 static void
600 hash_xlog_squeeze_page(XLogReaderState *record)
601 {
602         XLogRecPtr      lsn = record->EndRecPtr;
603         xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record);
604         Buffer          bucketbuf = InvalidBuffer;
605         Buffer          writebuf;
606         Buffer          ovflbuf;
607         Buffer          prevbuf = InvalidBuffer;
608         Buffer          mapbuf;
609         XLogRedoAction action;
610
611         /*
612          * Ensure we have a cleanup lock on primary bucket page before we start
613          * with the actual replay operation.  This is to ensure that neither a
614          * scan can start nor a scan can be already-in-progress during the replay
615          * of this operation.  If we allow scans during this operation, then they
616          * can miss some records or show the same record multiple times.
617          */
618         if (xldata->is_prim_bucket_same_wrt)
619                 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
620         else
621         {
622                 /*
623                  * we don't care for return value as the purpose of reading bucketbuf
624                  * is to ensure a cleanup lock on primary bucket page.
625                  */
626                 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
627
628                 action = XLogReadBufferForRedo(record, 1, &writebuf);
629         }
630
631         /* replay the record for adding entries in overflow buffer */
632         if (action == BLK_NEEDS_REDO)
633         {
634                 Page            writepage;
635                 char       *begin;
636                 char       *data;
637                 Size            datalen;
638                 uint16          ninserted = 0;
639
640                 data = begin = XLogRecGetBlockData(record, 1, &datalen);
641
642                 writepage = (Page) BufferGetPage(writebuf);
643
644                 if (xldata->ntups > 0)
645                 {
646                         OffsetNumber *towrite = (OffsetNumber *) data;
647
648                         data += sizeof(OffsetNumber) * xldata->ntups;
649
650                         while (data - begin < datalen)
651                         {
652                                 IndexTuple      itup = (IndexTuple) data;
653                                 Size            itemsz;
654                                 OffsetNumber l;
655
656                                 itemsz = IndexTupleDSize(*itup);
657                                 itemsz = MAXALIGN(itemsz);
658
659                                 data += itemsz;
660
661                                 l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
662                                 if (l == InvalidOffsetNumber)
663                                         elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes",
664                                                  (int) itemsz);
665
666                                 ninserted++;
667                         }
668                 }
669
670                 /*
671                  * number of tuples inserted must be same as requested in REDO record.
672                  */
673                 Assert(ninserted == xldata->ntups);
674
675                 /*
676                  * if the page on which are adding tuples is a page previous to freed
677                  * overflow page, then update its nextblno.
678                  */
679                 if (xldata->is_prev_bucket_same_wrt)
680                 {
681                         HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage);
682
683                         writeopaque->hasho_nextblkno = xldata->nextblkno;
684                 }
685
686                 PageSetLSN(writepage, lsn);
687                 MarkBufferDirty(writebuf);
688         }
689
690         /* replay the record for initializing overflow buffer */
691         if (XLogReadBufferForRedo(record, 2, &ovflbuf) == BLK_NEEDS_REDO)
692         {
693                 Page            ovflpage;
694
695                 ovflpage = BufferGetPage(ovflbuf);
696
697                 _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
698
699                 PageSetLSN(ovflpage, lsn);
700                 MarkBufferDirty(ovflbuf);
701         }
702         if (BufferIsValid(ovflbuf))
703                 UnlockReleaseBuffer(ovflbuf);
704
705         /* replay the record for page previous to the freed overflow page */
706         if (!xldata->is_prev_bucket_same_wrt &&
707                 XLogReadBufferForRedo(record, 3, &prevbuf) == BLK_NEEDS_REDO)
708         {
709                 Page            prevpage = BufferGetPage(prevbuf);
710                 HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
711
712                 prevopaque->hasho_nextblkno = xldata->nextblkno;
713
714                 PageSetLSN(prevpage, lsn);
715                 MarkBufferDirty(prevbuf);
716         }
717         if (BufferIsValid(prevbuf))
718                 UnlockReleaseBuffer(prevbuf);
719
720         /* replay the record for page next to the freed overflow page */
721         if (XLogRecHasBlockRef(record, 4))
722         {
723                 Buffer          nextbuf;
724
725                 if (XLogReadBufferForRedo(record, 4, &nextbuf) == BLK_NEEDS_REDO)
726                 {
727                         Page            nextpage = BufferGetPage(nextbuf);
728                         HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
729
730                         nextopaque->hasho_prevblkno = xldata->prevblkno;
731
732                         PageSetLSN(nextpage, lsn);
733                         MarkBufferDirty(nextbuf);
734                 }
735                 if (BufferIsValid(nextbuf))
736                         UnlockReleaseBuffer(nextbuf);
737         }
738
739         if (BufferIsValid(writebuf))
740                 UnlockReleaseBuffer(writebuf);
741
742         if (BufferIsValid(bucketbuf))
743                 UnlockReleaseBuffer(bucketbuf);
744
745         /*
746          * Note: in normal operation, we'd update the bitmap and meta page while
747          * still holding lock on the primary bucket page and overflow pages.  But
748          * during replay it's not necessary to hold those locks, since no other
749          * index updates can be happening concurrently.
750          */
751         /* replay the record for bitmap page */
752         if (XLogReadBufferForRedo(record, 5, &mapbuf) == BLK_NEEDS_REDO)
753         {
754                 Page            mappage = (Page) BufferGetPage(mapbuf);
755                 uint32     *freep = NULL;
756                 char       *data;
757                 uint32     *bitmap_page_bit;
758                 Size            datalen;
759
760                 freep = HashPageGetBitmap(mappage);
761
762                 data = XLogRecGetBlockData(record, 5, &datalen);
763                 bitmap_page_bit = (uint32 *) data;
764
765                 CLRBIT(freep, *bitmap_page_bit);
766
767                 PageSetLSN(mappage, lsn);
768                 MarkBufferDirty(mapbuf);
769         }
770         if (BufferIsValid(mapbuf))
771                 UnlockReleaseBuffer(mapbuf);
772
773         /* replay the record for meta page */
774         if (XLogRecHasBlockRef(record, 6))
775         {
776                 Buffer          metabuf;
777
778                 if (XLogReadBufferForRedo(record, 6, &metabuf) == BLK_NEEDS_REDO)
779                 {
780                         HashMetaPage metap;
781                         Page            page;
782                         char       *data;
783                         uint32     *firstfree_ovflpage;
784                         Size            datalen;
785
786                         data = XLogRecGetBlockData(record, 6, &datalen);
787                         firstfree_ovflpage = (uint32 *) data;
788
789                         page = BufferGetPage(metabuf);
790                         metap = HashPageGetMeta(page);
791                         metap->hashm_firstfree = *firstfree_ovflpage;
792
793                         PageSetLSN(page, lsn);
794                         MarkBufferDirty(metabuf);
795                 }
796                 if (BufferIsValid(metabuf))
797                         UnlockReleaseBuffer(metabuf);
798         }
799 }
800
801 /*
802  * replay delete operation of hash index
803  */
804 static void
805 hash_xlog_delete(XLogReaderState *record)
806 {
807         XLogRecPtr      lsn = record->EndRecPtr;
808         xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record);
809         Buffer          bucketbuf = InvalidBuffer;
810         Buffer          deletebuf;
811         Page            page;
812         XLogRedoAction action;
813
814         /*
815          * Ensure we have a cleanup lock on primary bucket page before we start
816          * with the actual replay operation.  This is to ensure that neither a
817          * scan can start nor a scan can be already-in-progress during the replay
818          * of this operation.  If we allow scans during this operation, then they
819          * can miss some records or show the same record multiple times.
820          */
821         if (xldata->is_primary_bucket_page)
822                 action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &deletebuf);
823         else
824         {
825                 /*
826                  * we don't care for return value as the purpose of reading bucketbuf
827                  * is to ensure a cleanup lock on primary bucket page.
828                  */
829                 (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
830
831                 action = XLogReadBufferForRedo(record, 1, &deletebuf);
832         }
833
834         /* replay the record for deleting entries in bucket page */
835         if (action == BLK_NEEDS_REDO)
836         {
837                 char       *ptr;
838                 Size            len;
839
840                 ptr = XLogRecGetBlockData(record, 1, &len);
841
842                 page = (Page) BufferGetPage(deletebuf);
843
844                 if (len > 0)
845                 {
846                         OffsetNumber *unused;
847                         OffsetNumber *unend;
848
849                         unused = (OffsetNumber *) ptr;
850                         unend = (OffsetNumber *) ((char *) ptr + len);
851
852                         if ((unend - unused) > 0)
853                                 PageIndexMultiDelete(page, unused, unend - unused);
854                 }
855
856                 PageSetLSN(page, lsn);
857                 MarkBufferDirty(deletebuf);
858         }
859         if (BufferIsValid(deletebuf))
860                 UnlockReleaseBuffer(deletebuf);
861
862         if (BufferIsValid(bucketbuf))
863                 UnlockReleaseBuffer(bucketbuf);
864 }
865
866 /*
867  * replay split cleanup flag operation for primary bucket page.
868  */
869 static void
870 hash_xlog_split_cleanup(XLogReaderState *record)
871 {
872         XLogRecPtr      lsn = record->EndRecPtr;
873         Buffer          buffer;
874         Page            page;
875
876         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
877         {
878                 HashPageOpaque bucket_opaque;
879
880                 page = (Page) BufferGetPage(buffer);
881
882                 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
883                 bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
884                 PageSetLSN(page, lsn);
885                 MarkBufferDirty(buffer);
886         }
887         if (BufferIsValid(buffer))
888                 UnlockReleaseBuffer(buffer);
889 }
890
891 /*
892  * replay for update meta page
893  */
894 static void
895 hash_xlog_update_meta_page(XLogReaderState *record)
896 {
897         HashMetaPage metap;
898         XLogRecPtr      lsn = record->EndRecPtr;
899         xl_hash_update_meta_page *xldata = (xl_hash_update_meta_page *) XLogRecGetData(record);
900         Buffer          metabuf;
901         Page            page;
902
903         if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
904         {
905                 page = BufferGetPage(metabuf);
906                 metap = HashPageGetMeta(page);
907
908                 metap->hashm_ntuples = xldata->ntuples;
909
910                 PageSetLSN(page, lsn);
911                 MarkBufferDirty(metabuf);
912         }
913         if (BufferIsValid(metabuf))
914                 UnlockReleaseBuffer(metabuf);
915 }
916
917 void
918 hash_redo(XLogReaderState *record)
919 {
920         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
921
922         switch (info)
923         {
924                 case XLOG_HASH_INIT_META_PAGE:
925                         hash_xlog_init_meta_page(record);
926                         break;
927                 case XLOG_HASH_INIT_BITMAP_PAGE:
928                         hash_xlog_init_bitmap_page(record);
929                         break;
930                 case XLOG_HASH_INSERT:
931                         hash_xlog_insert(record);
932                         break;
933                 case XLOG_HASH_ADD_OVFL_PAGE:
934                         hash_xlog_add_ovfl_page(record);
935                         break;
936                 case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
937                         hash_xlog_split_allocate_page(record);
938                         break;
939                 case XLOG_HASH_SPLIT_PAGE:
940                         hash_xlog_split_page(record);
941                         break;
942                 case XLOG_HASH_SPLIT_COMPLETE:
943                         hash_xlog_split_complete(record);
944                         break;
945                 case XLOG_HASH_MOVE_PAGE_CONTENTS:
946                         hash_xlog_move_page_contents(record);
947                         break;
948                 case XLOG_HASH_SQUEEZE_PAGE:
949                         hash_xlog_squeeze_page(record);
950                         break;
951                 case XLOG_HASH_DELETE:
952                         hash_xlog_delete(record);
953                         break;
954                 case XLOG_HASH_SPLIT_CLEANUP:
955                         hash_xlog_split_cleanup(record);
956                         break;
957                 case XLOG_HASH_UPDATE_META_PAGE:
958                         hash_xlog_update_meta_page(record);
959                         break;
960                 default:
961                         elog(PANIC, "hash_redo: unknown op code %u", info);
962         }
963 }