]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/ginxlog.c
Rewrite the way GIN posting lists are packed on a page, to reduce WAL volume.
[postgresql] / src / backend / access / gin / ginxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginxlog.c
4  *        WAL replay logic for inverted index.
5  *
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                       src/backend/access/gin/ginxlog.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include "access/gin_private.h"
17 #include "access/xlogutils.h"
18 #include "utils/memutils.h"
19
20 static MemoryContext opCtx;             /* working memory for operations */
21
22 static void
23 ginRedoClearIncompleteSplit(XLogRecPtr lsn, RelFileNode node, BlockNumber blkno)
24 {
25         Buffer          buffer;
26         Page            page;
27
28         buffer = XLogReadBuffer(node, blkno, false);
29         if (!BufferIsValid(buffer))
30                 return;                                 /* page was deleted, nothing to do */
31         page = (Page) BufferGetPage(buffer);
32
33         if (lsn > PageGetLSN(page))
34         {
35                 GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
36
37                 PageSetLSN(page, lsn);
38                 MarkBufferDirty(buffer);
39         }
40
41         UnlockReleaseBuffer(buffer);
42 }
43
44 static void
45 ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
46 {
47         RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
48         Buffer          RootBuffer,
49                                 MetaBuffer;
50         Page            page;
51
52         /* Backup blocks are not used in create_index records */
53         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
54
55         MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
56         Assert(BufferIsValid(MetaBuffer));
57         page = (Page) BufferGetPage(MetaBuffer);
58
59         GinInitMetabuffer(MetaBuffer);
60
61         PageSetLSN(page, lsn);
62         MarkBufferDirty(MetaBuffer);
63
64         RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
65         Assert(BufferIsValid(RootBuffer));
66         page = (Page) BufferGetPage(RootBuffer);
67
68         GinInitBuffer(RootBuffer, GIN_LEAF);
69
70         PageSetLSN(page, lsn);
71         MarkBufferDirty(RootBuffer);
72
73         UnlockReleaseBuffer(RootBuffer);
74         UnlockReleaseBuffer(MetaBuffer);
75 }
76
77 static void
78 ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
79 {
80         ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
81         char       *ptr;
82         Buffer          buffer;
83         Page            page;
84
85         /* Backup blocks are not used in create_ptree records */
86         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
87
88         buffer = XLogReadBuffer(data->node, data->blkno, true);
89         Assert(BufferIsValid(buffer));
90         page = (Page) BufferGetPage(buffer);
91
92         GinInitBuffer(buffer, GIN_DATA | GIN_LEAF | GIN_COMPRESSED);
93
94         ptr = XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree);
95
96         /* Place page data */
97         memcpy(GinDataLeafPageGetPostingList(page), ptr, data->size);
98
99         GinDataLeafPageSetPostingListSize(page, data->size);
100
101         PageSetLSN(page, lsn);
102
103         MarkBufferDirty(buffer);
104         UnlockReleaseBuffer(buffer);
105 }
106
107 static void
108 ginRedoInsertEntry(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
109 {
110         Page            page = BufferGetPage(buffer);
111         ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
112         OffsetNumber offset = data->offset;
113         IndexTuple      itup;
114
115         if (rightblkno != InvalidBlockNumber)
116         {
117                 /* update link to right page after split */
118                 Assert(!GinPageIsLeaf(page));
119                 Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
120                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
121                 GinSetDownlink(itup, rightblkno);
122         }
123
124         if (data->isDelete)
125         {
126                 Assert(GinPageIsLeaf(page));
127                 Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
128                 PageIndexTupleDelete(page, offset);
129         }
130
131         itup = &data->tuple;
132
133         if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), offset, false, false) == InvalidOffsetNumber)
134         {
135                 RelFileNode node;
136                 ForkNumber forknum;
137                 BlockNumber blknum;
138
139                 BufferGetTag(buffer, &node, &forknum, &blknum);
140                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
141                          node.spcNode, node.dbNode, node.relNode);
142         }
143 }
144
145 static void
146 ginRedoRecompress(Page page, ginxlogRecompressDataLeaf *data)
147 {
148         int                     actionno;
149         int                     segno;
150         GinPostingList *oldseg;
151         Pointer         segmentend;
152         char       *walbuf;
153         int                     totalsize;
154
155         /*
156          * If the page is in pre-9.4 format, convert to new format first.
157          */
158         if (!GinPageIsCompressed(page))
159         {
160                 ItemPointer uncompressed = (ItemPointer) GinDataPageGetData(page);
161                 int                     nuncompressed = GinPageGetOpaque(page)->maxoff;
162                 int                     npacked;
163                 GinPostingList *plist;
164
165                 plist = ginCompressPostingList(uncompressed, nuncompressed,
166                                                                            BLCKSZ, &npacked);
167                 Assert(npacked == nuncompressed);
168
169                 totalsize = SizeOfGinPostingList(plist);
170
171                 memcpy(GinDataLeafPageGetPostingList(page), plist, totalsize);
172                 GinDataLeafPageSetPostingListSize(page, totalsize);
173                 GinPageSetCompressed(page);
174                 GinPageGetOpaque(page)->maxoff = InvalidOffsetNumber;
175         }
176
177         oldseg = GinDataLeafPageGetPostingList(page);
178         segmentend = (Pointer) oldseg + GinDataLeafPageGetPostingListSize(page);
179         segno = 0;
180
181         walbuf = ((char *) data) + sizeof(ginxlogRecompressDataLeaf);
182         for (actionno = 0; actionno < data->nactions; actionno++)
183         {
184                 uint8           a_segno = *((uint8 *) (walbuf++));
185                 uint8           a_action = *((uint8 *) (walbuf++));
186                 GinPostingList *newseg = NULL;
187                 int                     newsegsize = 0;
188                 ItemPointerData *items = NULL;
189                 uint16          nitems = 0;
190                 ItemPointerData *olditems;
191                 int                     nolditems;
192                 ItemPointerData *newitems;
193                 int                     nnewitems;
194                 int                     segsize;
195                 Pointer         segptr;
196                 int                     szleft;
197
198                 /* Extract all the information we need from the WAL record */
199                 if (a_action == GIN_SEGMENT_INSERT ||
200                         a_action == GIN_SEGMENT_REPLACE)
201                 {
202                         newseg = (GinPostingList *) walbuf;
203                         newsegsize = SizeOfGinPostingList(newseg);
204                         walbuf += SHORTALIGN(newsegsize);
205                 }
206
207                 if (a_action == GIN_SEGMENT_ADDITEMS)
208                 {
209                         memcpy(&nitems, walbuf, sizeof(uint16));
210                         walbuf += sizeof(uint16);
211                         items = (ItemPointerData *) walbuf;
212                         walbuf += nitems * sizeof(ItemPointerData);
213                 }
214
215                 /* Skip to the segment that this action concerns */
216                 Assert(segno <= a_segno);
217                 while (segno < a_segno)
218                 {
219                         oldseg = GinNextPostingListSegment(oldseg);
220                         segno++;
221                 }
222
223                 /*
224                  * ADDITEMS action is handled like REPLACE, but the new segment to
225                  * replace the old one is reconstructed using the old segment from
226                  * disk and the new items from the WAL record.
227                  */
228                 if (a_action == GIN_SEGMENT_ADDITEMS)
229                 {
230                         int                     npacked;
231
232                         olditems = ginPostingListDecode(oldseg, &nolditems);
233
234                         newitems = ginMergeItemPointers(items, nitems,
235                                                                                         olditems, nolditems,
236                                                                                         &nnewitems);
237                         Assert(nnewitems == nolditems + nitems);
238
239                         newseg = ginCompressPostingList(newitems, nnewitems,
240                                                                                         BLCKSZ, &npacked);
241                         Assert(npacked == nnewitems);
242
243                         newsegsize = SizeOfGinPostingList(newseg);
244                         a_action = GIN_SEGMENT_REPLACE;
245                 }
246
247                 segptr = (Pointer) oldseg;
248                 if (segptr != segmentend)
249                         segsize = SizeOfGinPostingList(oldseg);
250                 else
251                 {
252                         /*
253                          * Positioned after the last existing segment. Only INSERTs
254                          * expected here.
255                          */
256                         Assert(a_action == GIN_SEGMENT_INSERT);
257                         segsize = 0;
258                 }
259                 szleft = segmentend - segptr;
260
261                 switch (a_action)
262                 {
263                         case GIN_SEGMENT_DELETE:
264                                 memmove(segptr, segptr + segsize, szleft - segsize);
265                                 segmentend -= segsize;
266
267                                 segno++;
268                                 break;
269
270                         case GIN_SEGMENT_INSERT:
271                                 /* make room for the new segment */
272                                 memmove(segptr + newsegsize, segptr, szleft);
273                                 /* copy the new segment in place */
274                                 memcpy(segptr, newseg, newsegsize);
275                                 segmentend += newsegsize;
276                                 segptr += newsegsize;
277                                 break;
278
279                         case GIN_SEGMENT_REPLACE:
280                                 /* shift the segments that follow */
281                                 memmove(segptr + newsegsize,
282                                                 segptr + segsize,
283                                                 szleft - segsize);
284                                 /* copy the replacement segment in place */
285                                 memcpy(segptr, newseg, newsegsize);
286                                 segmentend -= segsize;
287                                 segmentend += newsegsize;
288                                 segptr += newsegsize;
289                                 segno++;
290                                 break;
291
292                         default:
293                                 elog(ERROR, "unexpected GIN leaf action: %u", a_action);
294                 }
295                 oldseg = (GinPostingList *) segptr;
296         }
297
298         totalsize = segmentend - (Pointer) GinDataLeafPageGetPostingList(page);
299         GinDataLeafPageSetPostingListSize(page, totalsize);
300 }
301
302 static void
303 ginRedoInsertData(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
304 {
305         Page            page = BufferGetPage(buffer);
306
307         if (isLeaf)
308         {
309                 ginxlogRecompressDataLeaf *data = (ginxlogRecompressDataLeaf *) rdata;
310
311                 Assert(GinPageIsLeaf(page));
312
313                 ginRedoRecompress(page, data);
314         }
315         else
316         {
317                 ginxlogInsertDataInternal *data = (ginxlogInsertDataInternal *) rdata;
318                 PostingItem *oldpitem;
319
320                 Assert(!GinPageIsLeaf(page));
321
322                 /* update link to right page after split */
323                 oldpitem = GinDataPageGetPostingItem(page, data->offset);
324                 PostingItemSetBlockNumber(oldpitem, rightblkno);
325
326                 GinDataPageAddPostingItem(page, &data->newitem, data->offset);
327         }
328 }
329
330 static void
331 ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
332 {
333         ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
334         Buffer          buffer;
335         Page            page;
336         char       *payload;
337         BlockNumber leftChildBlkno = InvalidBlockNumber;
338         BlockNumber rightChildBlkno = InvalidBlockNumber;
339         bool            isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
340
341         payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
342
343         /*
344          * First clear incomplete-split flag on child page if this finishes
345          * a split.
346          */
347         if (!isLeaf)
348         {
349                 leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
350                 payload += sizeof(BlockIdData);
351                 rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
352                 payload += sizeof(BlockIdData);
353
354                 if (record->xl_info & XLR_BKP_BLOCK(0))
355                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
356                 else
357                         ginRedoClearIncompleteSplit(lsn, data->node, leftChildBlkno);
358         }
359
360         /* If we have a full-page image, restore it and we're done */
361         if (record->xl_info & XLR_BKP_BLOCK(isLeaf ? 0 : 1))
362         {
363                 (void) RestoreBackupBlock(lsn, record, isLeaf ? 0 : 1, false, false);
364                 return;
365         }
366
367         buffer = XLogReadBuffer(data->node, data->blkno, false);
368         if (!BufferIsValid(buffer))
369                 return;                                 /* page was deleted, nothing to do */
370         page = (Page) BufferGetPage(buffer);
371
372         if (lsn > PageGetLSN(page))
373         {
374                 /* How to insert the payload is tree-type specific */
375                 if (data->flags & GIN_INSERT_ISDATA)
376                 {
377                         Assert(GinPageIsData(page));
378                         ginRedoInsertData(buffer, isLeaf, rightChildBlkno, payload);
379                 }
380                 else
381                 {
382                         Assert(!GinPageIsData(page));
383                         ginRedoInsertEntry(buffer, isLeaf, rightChildBlkno, payload);
384                 }
385
386                 PageSetLSN(page, lsn);
387                 MarkBufferDirty(buffer);
388         }
389
390         UnlockReleaseBuffer(buffer);
391 }
392
393 static void
394 ginRedoSplitEntry(Page lpage, Page rpage, void *rdata)
395 {
396         ginxlogSplitEntry *data = (ginxlogSplitEntry *) rdata;
397         IndexTuple      itup = (IndexTuple) ((char *) rdata + sizeof(ginxlogSplitEntry));
398         OffsetNumber i;
399
400         for (i = 0; i < data->separator; i++)
401         {
402                 if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
403                         elog(ERROR, "failed to add item to gin index page");
404                 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
405         }
406
407         for (i = data->separator; i < data->nitem; i++)
408         {
409                 if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
410                         elog(ERROR, "failed to add item to gin index page");
411                 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
412         }
413 }
414
415 static void
416 ginRedoSplitData(Page lpage, Page rpage, void *rdata)
417 {
418         bool            isleaf = GinPageIsLeaf(lpage);
419
420         if (isleaf)
421         {
422                 ginxlogSplitDataLeaf *data = (ginxlogSplitDataLeaf *) rdata;
423                 Pointer         lptr = (Pointer) rdata + sizeof(ginxlogSplitDataLeaf);
424                 Pointer         rptr = lptr + data->lsize;
425
426                 Assert(data->lsize > 0 && data->lsize <= GinDataLeafMaxContentSize);
427                 Assert(data->rsize > 0 && data->rsize <= GinDataLeafMaxContentSize);
428
429                 memcpy(GinDataLeafPageGetPostingList(lpage), lptr, data->lsize);
430                 memcpy(GinDataLeafPageGetPostingList(rpage), rptr, data->rsize);
431
432                 GinDataLeafPageSetPostingListSize(lpage, data->lsize);
433                 GinDataLeafPageSetPostingListSize(rpage, data->rsize);
434                 *GinDataPageGetRightBound(lpage) = data->lrightbound;
435                 *GinDataPageGetRightBound(rpage) = data->rrightbound;
436         }
437         else
438         {
439                 ginxlogSplitDataInternal *data = (ginxlogSplitDataInternal *) rdata;
440                 PostingItem *items = (PostingItem *) ((char *) rdata + sizeof(ginxlogSplitDataInternal));
441                 OffsetNumber i;
442                 OffsetNumber maxoff;
443
444                 for (i = 0; i < data->separator; i++)
445                         GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber);
446                 for (i = data->separator; i < data->nitem; i++)
447                         GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber);
448
449                 /* set up right key */
450                 maxoff = GinPageGetOpaque(lpage)->maxoff;
451                 *GinDataPageGetRightBound(lpage) = GinDataPageGetPostingItem(lpage, maxoff)->key;
452                 *GinDataPageGetRightBound(rpage) = data->rightbound;
453         }
454 }
455
456 static void
457 ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
458 {
459         ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
460         Buffer          lbuffer,
461                                 rbuffer;
462         Page            lpage,
463                                 rpage;
464         uint32          flags = 0;
465         char       *payload;
466         bool            isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
467         bool            isData = (data->flags & GIN_INSERT_ISDATA) != 0;
468         bool            isRoot = (data->flags & GIN_SPLIT_ROOT) != 0;
469
470         payload = XLogRecGetData(record) + sizeof(ginxlogSplit);
471
472         /*
473          * First clear incomplete-split flag on child page if this finishes
474          * a split
475          */
476         if (!isLeaf)
477         {
478                 if (record->xl_info & XLR_BKP_BLOCK(0))
479                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
480                 else
481                         ginRedoClearIncompleteSplit(lsn, data->node, data->leftChildBlkno);
482         }
483
484         if (isLeaf)
485                 flags |= GIN_LEAF;
486         if (isData)
487                 flags |= GIN_DATA;
488         if (isLeaf && isData)
489                 flags |= GIN_COMPRESSED;
490
491         lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
492         Assert(BufferIsValid(lbuffer));
493         lpage = (Page) BufferGetPage(lbuffer);
494         GinInitBuffer(lbuffer, flags);
495
496         rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
497         Assert(BufferIsValid(rbuffer));
498         rpage = (Page) BufferGetPage(rbuffer);
499         GinInitBuffer(rbuffer, flags);
500
501         GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
502         GinPageGetOpaque(rpage)->rightlink = isRoot ? InvalidBlockNumber : data->rrlink;
503
504         /* Do the tree-type specific portion to restore the page contents */
505         if (isData)
506                 ginRedoSplitData(lpage, rpage, payload);
507         else
508                 ginRedoSplitEntry(lpage, rpage, payload);
509
510         PageSetLSN(rpage, lsn);
511         MarkBufferDirty(rbuffer);
512
513         PageSetLSN(lpage, lsn);
514         MarkBufferDirty(lbuffer);
515
516         if (isRoot)
517         {
518                 BlockNumber     rootBlkno = data->rrlink;
519                 Buffer          rootBuf = XLogReadBuffer(data->node, rootBlkno, true);
520                 Page            rootPage = BufferGetPage(rootBuf);
521
522                 GinInitBuffer(rootBuf, flags & ~GIN_LEAF & ~GIN_COMPRESSED);
523
524                 if (isData)
525                 {
526                         Assert(rootBlkno != GIN_ROOT_BLKNO);
527                         ginDataFillRoot(NULL, BufferGetPage(rootBuf),
528                                                         BufferGetBlockNumber(lbuffer),
529                                                         BufferGetPage(lbuffer),
530                                                         BufferGetBlockNumber(rbuffer),
531                                                         BufferGetPage(rbuffer));
532                 }
533                 else
534                 {
535                         Assert(rootBlkno == GIN_ROOT_BLKNO);
536                         ginEntryFillRoot(NULL, BufferGetPage(rootBuf),
537                                                          BufferGetBlockNumber(lbuffer),
538                                                          BufferGetPage(lbuffer),
539                                                          BufferGetBlockNumber(rbuffer),
540                                                          BufferGetPage(rbuffer));
541                 }
542
543                 PageSetLSN(rootPage, lsn);
544
545                 MarkBufferDirty(rootBuf);
546                 UnlockReleaseBuffer(rootBuf);
547         }
548
549         UnlockReleaseBuffer(rbuffer);
550         UnlockReleaseBuffer(lbuffer);
551 }
552
553 /*
554  * This is functionally the same as heap_xlog_newpage.
555  */
556 static void
557 ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
558 {
559         ginxlogVacuumPage *xlrec = (ginxlogVacuumPage *) XLogRecGetData(record);
560         char       *blk = ((char *) xlrec) + sizeof(ginxlogVacuumPage);
561         Buffer          buffer;
562         Page            page;
563
564         Assert(xlrec->hole_offset < BLCKSZ);
565         Assert(xlrec->hole_length < BLCKSZ);
566
567         /* If we have a full-page image, restore it and we're done */
568         if (record->xl_info & XLR_BKP_BLOCK(0))
569         {
570                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
571                 return;
572         }
573
574         buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true);
575         if (!BufferIsValid(buffer))
576                 return;
577         page = (Page) BufferGetPage(buffer);
578
579         if (xlrec->hole_length == 0)
580         {
581                 memcpy((char *) page, blk, BLCKSZ);
582         }
583         else
584         {
585                 memcpy((char *) page, blk, xlrec->hole_offset);
586                 /* must zero-fill the hole */
587                 MemSet((char *) page + xlrec->hole_offset, 0, xlrec->hole_length);
588                 memcpy((char *) page + (xlrec->hole_offset + xlrec->hole_length),
589                            blk + xlrec->hole_offset,
590                            BLCKSZ - (xlrec->hole_offset + xlrec->hole_length));
591         }
592
593         PageSetLSN(page, lsn);
594
595         MarkBufferDirty(buffer);
596         UnlockReleaseBuffer(buffer);
597 }
598
599 static void
600 ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record)
601 {
602         ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetData(record);
603         Buffer          buffer;
604         Page            page;
605
606         /* If we have a full-page image, restore it and we're done */
607         if (record->xl_info & XLR_BKP_BLOCK(0))
608         {
609                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
610                 return;
611         }
612
613         buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, false);
614         if (!BufferIsValid(buffer))
615                 return;
616         page = (Page) BufferGetPage(buffer);
617
618         Assert(GinPageIsLeaf(page));
619         Assert(GinPageIsData(page));
620
621         if (lsn > PageGetLSN(page))
622         {
623                 ginRedoRecompress(page, &xlrec->data);
624                 PageSetLSN(page, lsn);
625                 MarkBufferDirty(buffer);
626         }
627
628         UnlockReleaseBuffer(buffer);
629 }
630
631 static void
632 ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
633 {
634         ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
635         Buffer          dbuffer;
636         Buffer          pbuffer;
637         Buffer          lbuffer;
638         Page            page;
639
640         if (record->xl_info & XLR_BKP_BLOCK(0))
641                 dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
642         else
643         {
644                 dbuffer = XLogReadBuffer(data->node, data->blkno, false);
645                 if (BufferIsValid(dbuffer))
646                 {
647                         page = BufferGetPage(dbuffer);
648                         if (lsn > PageGetLSN(page))
649                         {
650                                 Assert(GinPageIsData(page));
651                                 GinPageGetOpaque(page)->flags = GIN_DELETED;
652                                 PageSetLSN(page, lsn);
653                                 MarkBufferDirty(dbuffer);
654                         }
655                 }
656         }
657
658         if (record->xl_info & XLR_BKP_BLOCK(1))
659                 pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
660         else
661         {
662                 pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
663                 if (BufferIsValid(pbuffer))
664                 {
665                         page = BufferGetPage(pbuffer);
666                         if (lsn > PageGetLSN(page))
667                         {
668                                 Assert(GinPageIsData(page));
669                                 Assert(!GinPageIsLeaf(page));
670                                 GinPageDeletePostingItem(page, data->parentOffset);
671                                 PageSetLSN(page, lsn);
672                                 MarkBufferDirty(pbuffer);
673                         }
674                 }
675         }
676
677         if (record->xl_info & XLR_BKP_BLOCK(2))
678                 (void) RestoreBackupBlock(lsn, record, 2, false, false);
679         else if (data->leftBlkno != InvalidBlockNumber)
680         {
681                 lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
682                 if (BufferIsValid(lbuffer))
683                 {
684                         page = BufferGetPage(lbuffer);
685                         if (lsn > PageGetLSN(page))
686                         {
687                                 Assert(GinPageIsData(page));
688                                 GinPageGetOpaque(page)->rightlink = data->rightLink;
689                                 PageSetLSN(page, lsn);
690                                 MarkBufferDirty(lbuffer);
691                         }
692                         UnlockReleaseBuffer(lbuffer);
693                 }
694         }
695
696         if (BufferIsValid(pbuffer))
697                 UnlockReleaseBuffer(pbuffer);
698         if (BufferIsValid(dbuffer))
699                 UnlockReleaseBuffer(dbuffer);
700 }
701
702 static void
703 ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
704 {
705         ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
706         Buffer          metabuffer;
707         Page            metapage;
708         Buffer          buffer;
709
710         /*
711          * Restore the metapage. This is essentially the same as a full-page image,
712          * so restore the metapage unconditionally without looking at the LSN, to
713          * avoid torn page hazards.
714          */
715         metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
716         if (!BufferIsValid(metabuffer))
717                 return;                                 /* assume index was deleted, nothing to do */
718         metapage = BufferGetPage(metabuffer);
719
720         memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
721         PageSetLSN(metapage, lsn);
722         MarkBufferDirty(metabuffer);
723
724         if (data->ntuples > 0)
725         {
726                 /*
727                  * insert into tail page
728                  */
729                 if (record->xl_info & XLR_BKP_BLOCK(0))
730                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
731                 else
732                 {
733                         buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
734                         if (BufferIsValid(buffer))
735                         {
736                                 Page            page = BufferGetPage(buffer);
737
738                                 if (lsn > PageGetLSN(page))
739                                 {
740                                         OffsetNumber l,
741                                                                 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
742                                         OffsetNumberNext(PageGetMaxOffsetNumber(page));
743                                         int                     i,
744                                                                 tupsize;
745                                         IndexTuple      tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
746
747                                         for (i = 0; i < data->ntuples; i++)
748                                         {
749                                                 tupsize = IndexTupleSize(tuples);
750
751                                                 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
752
753                                                 if (l == InvalidOffsetNumber)
754                                                         elog(ERROR, "failed to add item to index page");
755
756                                                 tuples = (IndexTuple) (((char *) tuples) + tupsize);
757
758                                                 off++;
759                                         }
760
761                                         /*
762                                          * Increase counter of heap tuples
763                                          */
764                                         GinPageGetOpaque(page)->maxoff++;
765
766                                         PageSetLSN(page, lsn);
767                                         MarkBufferDirty(buffer);
768                                 }
769                                 UnlockReleaseBuffer(buffer);
770                         }
771                 }
772         }
773         else if (data->prevTail != InvalidBlockNumber)
774         {
775                 /*
776                  * New tail
777                  */
778                 if (record->xl_info & XLR_BKP_BLOCK(0))
779                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
780                 else
781                 {
782                         buffer = XLogReadBuffer(data->node, data->prevTail, false);
783                         if (BufferIsValid(buffer))
784                         {
785                                 Page            page = BufferGetPage(buffer);
786
787                                 if (lsn > PageGetLSN(page))
788                                 {
789                                         GinPageGetOpaque(page)->rightlink = data->newRightlink;
790
791                                         PageSetLSN(page, lsn);
792                                         MarkBufferDirty(buffer);
793                                 }
794                                 UnlockReleaseBuffer(buffer);
795                         }
796                 }
797         }
798
799         UnlockReleaseBuffer(metabuffer);
800 }
801
802 static void
803 ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
804 {
805         ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record);
806         Buffer          buffer;
807         Page            page;
808         OffsetNumber l,
809                                 off = FirstOffsetNumber;
810         int                     i,
811                                 tupsize;
812         IndexTuple      tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
813
814         /* If we have a full-page image, restore it and we're done */
815         if (record->xl_info & XLR_BKP_BLOCK(0))
816         {
817                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
818                 return;
819         }
820
821         buffer = XLogReadBuffer(data->node, data->blkno, true);
822         Assert(BufferIsValid(buffer));
823         page = BufferGetPage(buffer);
824
825         GinInitBuffer(buffer, GIN_LIST);
826         GinPageGetOpaque(page)->rightlink = data->rightlink;
827         if (data->rightlink == InvalidBlockNumber)
828         {
829                 /* tail of sublist */
830                 GinPageSetFullRow(page);
831                 GinPageGetOpaque(page)->maxoff = 1;
832         }
833         else
834         {
835                 GinPageGetOpaque(page)->maxoff = 0;
836         }
837
838         for (i = 0; i < data->ntuples; i++)
839         {
840                 tupsize = IndexTupleSize(tuples);
841
842                 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
843
844                 if (l == InvalidOffsetNumber)
845                         elog(ERROR, "failed to add item to index page");
846
847                 tuples = (IndexTuple) (((char *) tuples) + tupsize);
848         }
849
850         PageSetLSN(page, lsn);
851         MarkBufferDirty(buffer);
852
853         UnlockReleaseBuffer(buffer);
854 }
855
856 static void
857 ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
858 {
859         ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record);
860         Buffer          metabuffer;
861         Page            metapage;
862         int                     i;
863
864         /* Backup blocks are not used in delete_listpage records */
865         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
866
867         metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
868         if (!BufferIsValid(metabuffer))
869                 return;                                 /* assume index was deleted, nothing to do */
870         metapage = BufferGetPage(metabuffer);
871
872         memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
873         PageSetLSN(metapage, lsn);
874         MarkBufferDirty(metabuffer);
875
876         /*
877          * In normal operation, shiftList() takes exclusive lock on all the
878          * pages-to-be-deleted simultaneously.  During replay, however, it should
879          * be all right to lock them one at a time.  This is dependent on the fact
880          * that we are deleting pages from the head of the list, and that readers
881          * share-lock the next page before releasing the one they are on. So we
882          * cannot get past a reader that is on, or due to visit, any page we are
883          * going to delete.  New incoming readers will block behind our metapage
884          * lock and then see a fully updated page list.
885          */
886         for (i = 0; i < data->ndeleted; i++)
887         {
888                 Buffer          buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
889
890                 if (BufferIsValid(buffer))
891                 {
892                         Page            page = BufferGetPage(buffer);
893
894                         if (lsn > PageGetLSN(page))
895                         {
896                                 GinPageGetOpaque(page)->flags = GIN_DELETED;
897
898                                 PageSetLSN(page, lsn);
899                                 MarkBufferDirty(buffer);
900                         }
901
902                         UnlockReleaseBuffer(buffer);
903                 }
904         }
905         UnlockReleaseBuffer(metabuffer);
906 }
907
908 void
909 gin_redo(XLogRecPtr lsn, XLogRecord *record)
910 {
911         uint8           info = record->xl_info & ~XLR_INFO_MASK;
912         MemoryContext oldCtx;
913
914         /*
915          * GIN indexes do not require any conflict processing. NB: If we ever
916          * implement a similar optimization as we have in b-tree, and remove
917          * killed tuples outside VACUUM, we'll need to handle that here.
918          */
919
920         oldCtx = MemoryContextSwitchTo(opCtx);
921         switch (info)
922         {
923                 case XLOG_GIN_CREATE_INDEX:
924                         ginRedoCreateIndex(lsn, record);
925                         break;
926                 case XLOG_GIN_CREATE_PTREE:
927                         ginRedoCreatePTree(lsn, record);
928                         break;
929                 case XLOG_GIN_INSERT:
930                         ginRedoInsert(lsn, record);
931                         break;
932                 case XLOG_GIN_SPLIT:
933                         ginRedoSplit(lsn, record);
934                         break;
935                 case XLOG_GIN_VACUUM_PAGE:
936                         ginRedoVacuumPage(lsn, record);
937                         break;
938                 case XLOG_GIN_VACUUM_DATA_LEAF_PAGE:
939                         ginRedoVacuumDataLeafPage(lsn, record);
940                         break;
941                 case XLOG_GIN_DELETE_PAGE:
942                         ginRedoDeletePage(lsn, record);
943                         break;
944                 case XLOG_GIN_UPDATE_META_PAGE:
945                         ginRedoUpdateMetapage(lsn, record);
946                         break;
947                 case XLOG_GIN_INSERT_LISTPAGE:
948                         ginRedoInsertListPage(lsn, record);
949                         break;
950                 case XLOG_GIN_DELETE_LISTPAGE:
951                         ginRedoDeleteListPages(lsn, record);
952                         break;
953                 default:
954                         elog(PANIC, "gin_redo: unknown op code %u", info);
955         }
956         MemoryContextSwitchTo(oldCtx);
957         MemoryContextReset(opCtx);
958 }
959
960 void
961 gin_xlog_startup(void)
962 {
963         opCtx = AllocSetContextCreate(CurrentMemoryContext,
964                                                                   "GIN recovery temporary context",
965                                                                   ALLOCSET_DEFAULT_MINSIZE,
966                                                                   ALLOCSET_DEFAULT_INITSIZE,
967                                                                   ALLOCSET_DEFAULT_MAXSIZE);
968 }
969
970 void
971 gin_xlog_cleanup(void)
972 {
973         MemoryContextDelete(opCtx);
974 }