]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/ginxlog.c
c13e01a3c6a55ce63cd9ca7ab999fee6a9355968
[postgresql] / src / backend / access / gin / ginxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginxlog.c
4  *        WAL replay logic for inverted index.
5  *
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                       src/backend/access/gin/ginxlog.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include "access/gin_private.h"
17 #include "access/xlogutils.h"
18 #include "utils/memutils.h"
19
20 static MemoryContext opCtx;             /* working memory for operations */
21
22 static void
23 ginRedoClearIncompleteSplit(XLogRecPtr lsn, RelFileNode node, BlockNumber blkno)
24 {
25         Buffer          buffer;
26         Page            page;
27
28         buffer = XLogReadBuffer(node, blkno, false);
29         if (!BufferIsValid(buffer))
30                 return;                                 /* page was deleted, nothing to do */
31         page = (Page) BufferGetPage(buffer);
32
33         if (lsn > PageGetLSN(page))
34         {
35                 GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
36
37                 PageSetLSN(page, lsn);
38                 MarkBufferDirty(buffer);
39         }
40
41         UnlockReleaseBuffer(buffer);
42 }
43
44 static void
45 ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
46 {
47         RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
48         Buffer          RootBuffer,
49                                 MetaBuffer;
50         Page            page;
51
52         /* Backup blocks are not used in create_index records */
53         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
54
55         MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
56         Assert(BufferIsValid(MetaBuffer));
57         page = (Page) BufferGetPage(MetaBuffer);
58
59         GinInitMetabuffer(MetaBuffer);
60
61         PageSetLSN(page, lsn);
62         MarkBufferDirty(MetaBuffer);
63
64         RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
65         Assert(BufferIsValid(RootBuffer));
66         page = (Page) BufferGetPage(RootBuffer);
67
68         GinInitBuffer(RootBuffer, GIN_LEAF);
69
70         PageSetLSN(page, lsn);
71         MarkBufferDirty(RootBuffer);
72
73         UnlockReleaseBuffer(RootBuffer);
74         UnlockReleaseBuffer(MetaBuffer);
75 }
76
77 static void
78 ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
79 {
80         ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
81         ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
82         Buffer          buffer;
83         Page            page;
84
85         /* Backup blocks are not used in create_ptree records */
86         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
87
88         buffer = XLogReadBuffer(data->node, data->blkno, true);
89         Assert(BufferIsValid(buffer));
90         page = (Page) BufferGetPage(buffer);
91
92         GinInitBuffer(buffer, GIN_DATA | GIN_LEAF);
93         memcpy(GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem);
94         GinPageGetOpaque(page)->maxoff = data->nitem;
95
96         PageSetLSN(page, lsn);
97
98         MarkBufferDirty(buffer);
99         UnlockReleaseBuffer(buffer);
100 }
101
102 static void
103 ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
104                                    void *rdata)
105 {
106         Page            page = BufferGetPage(buffer);
107         ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
108         IndexTuple      itup;
109
110         if (rightblkno != InvalidBlockNumber)
111         {
112                 /* update link to right page after split */
113                 Assert(!GinPageIsLeaf(page));
114                 Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
115                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
116                 GinSetDownlink(itup, rightblkno);
117         }
118
119         if (data->isDelete)
120         {
121                 Assert(GinPageIsLeaf(page));
122                 Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
123                 PageIndexTupleDelete(page, offset);
124         }
125
126         itup = &data->tuple;
127
128         if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), offset, false, false) == InvalidOffsetNumber)
129         {
130                 RelFileNode node;
131                 ForkNumber forknum;
132                 BlockNumber blknum;
133
134                 BufferGetTag(buffer, &node, &forknum, &blknum);
135                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
136                          node.spcNode, node.dbNode, node.relNode);
137         }
138 }
139
140 static void
141 ginRedoInsertData(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
142                                   void *rdata)
143 {
144         Page            page = BufferGetPage(buffer);
145
146         if (GinPageIsLeaf(page))
147         {
148                 ginxlogInsertDataLeaf *data = (ginxlogInsertDataLeaf *) rdata;
149                 ItemPointerData *items = data->items;
150                 OffsetNumber i;
151
152                 for (i = 0; i < data->nitem; i++)
153                         GinDataPageAddItemPointer(page, &items[i], offset + i);
154         }
155         else
156         {
157                 PostingItem *pitem = (PostingItem *) rdata;
158                 PostingItem *oldpitem;
159
160                 /* update link to right page after split */
161                 oldpitem = GinDataPageGetPostingItem(page, offset);
162                 PostingItemSetBlockNumber(oldpitem, rightblkno);
163
164                 GinDataPageAddPostingItem(page, pitem, offset);
165         }
166 }
167
168 static void
169 ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
170 {
171         ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
172         Buffer          buffer;
173         Page            page;
174         char       *payload;
175         BlockNumber leftChildBlkno = InvalidBlockNumber;
176         BlockNumber rightChildBlkno = InvalidBlockNumber;
177         bool            isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
178
179         payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
180
181         /*
182          * First clear incomplete-split flag on child page if this finishes
183          * a split.
184          */
185         if (!isLeaf)
186         {
187                 leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
188                 payload += sizeof(BlockIdData);
189                 rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
190                 payload += sizeof(BlockIdData);
191
192                 if (record->xl_info & XLR_BKP_BLOCK(0))
193                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
194                 else
195                         ginRedoClearIncompleteSplit(lsn, data->node, leftChildBlkno);
196         }
197
198         /* If we have a full-page image, restore it and we're done */
199         if (record->xl_info & XLR_BKP_BLOCK(isLeaf ? 0 : 1))
200         {
201                 (void) RestoreBackupBlock(lsn, record, isLeaf ? 0 : 1, false, false);
202                 return;
203         }
204
205         buffer = XLogReadBuffer(data->node, data->blkno, false);
206         if (!BufferIsValid(buffer))
207                 return;                                 /* page was deleted, nothing to do */
208         page = (Page) BufferGetPage(buffer);
209
210         if (lsn > PageGetLSN(page))
211         {
212                 /* How to insert the payload is tree-type specific */
213                 if (data->flags & GIN_INSERT_ISDATA)
214                 {
215                         Assert(GinPageIsData(page));
216                         ginRedoInsertData(buffer, data->offset, rightChildBlkno, payload);
217                 }
218                 else
219                 {
220                         Assert(!GinPageIsData(page));
221                         ginRedoInsertEntry(buffer, data->offset, rightChildBlkno, payload);
222                 }
223
224                 PageSetLSN(page, lsn);
225                 MarkBufferDirty(buffer);
226         }
227
228         UnlockReleaseBuffer(buffer);
229 }
230
231 static void
232 ginRedoSplitEntry(Page lpage, Page rpage, void *rdata)
233 {
234         ginxlogSplitEntry *data = (ginxlogSplitEntry *) rdata;
235         IndexTuple      itup = (IndexTuple) ((char *) rdata + sizeof(ginxlogSplitEntry));
236         OffsetNumber i;
237
238         for (i = 0; i < data->separator; i++)
239         {
240                 if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
241                         elog(ERROR, "failed to add item to gin index page");
242                 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
243         }
244
245         for (i = data->separator; i < data->nitem; i++)
246         {
247                 if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
248                         elog(ERROR, "failed to add item to gin index page");
249                 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
250         }
251 }
252
253 static void
254 ginRedoSplitData(Page lpage, Page rpage, void *rdata)
255 {
256         ginxlogSplitData *data = (ginxlogSplitData *) rdata;
257         bool            isleaf = GinPageIsLeaf(lpage);
258         char       *ptr = (char *) rdata + sizeof(ginxlogSplitData);
259         OffsetNumber i;
260         ItemPointer bound;
261
262         if (isleaf)
263         {
264                 ItemPointer items = (ItemPointer) ptr;
265                 for (i = 0; i < data->separator; i++)
266                         GinDataPageAddItemPointer(lpage, &items[i], InvalidOffsetNumber);
267                 for (i = data->separator; i < data->nitem; i++)
268                         GinDataPageAddItemPointer(rpage, &items[i], InvalidOffsetNumber);
269         }
270         else
271         {
272                 PostingItem *items = (PostingItem *) ptr;
273                 for (i = 0; i < data->separator; i++)
274                         GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber);
275                 for (i = data->separator; i < data->nitem; i++)
276                         GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber);
277         }
278
279         /* set up right key */
280         bound = GinDataPageGetRightBound(lpage);
281         if (isleaf)
282                 *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
283         else
284                 *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
285
286         bound = GinDataPageGetRightBound(rpage);
287         *bound = data->rightbound;
288 }
289
290 static void
291 ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
292 {
293         ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
294         Buffer          lbuffer,
295                                 rbuffer;
296         Page            lpage,
297                                 rpage;
298         uint32          flags = 0;
299         char       *payload;
300         bool            isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
301         bool            isData = (data->flags & GIN_INSERT_ISDATA) != 0;
302         bool            isRoot = (data->flags & GIN_SPLIT_ROOT) != 0;
303
304         payload = XLogRecGetData(record) + sizeof(ginxlogSplit);
305
306         /*
307          * First clear incomplete-split flag on child page if this finishes
308          * a split
309          */
310         if (!isLeaf)
311         {
312                 if (record->xl_info & XLR_BKP_BLOCK(0))
313                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
314                 else
315                         ginRedoClearIncompleteSplit(lsn, data->node, data->leftChildBlkno);
316         }
317
318         if (isLeaf)
319                 flags |= GIN_LEAF;
320
321         if (isData)
322                 flags |= GIN_DATA;
323
324         lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
325         Assert(BufferIsValid(lbuffer));
326         lpage = (Page) BufferGetPage(lbuffer);
327         GinInitBuffer(lbuffer, flags);
328
329         rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
330         Assert(BufferIsValid(rbuffer));
331         rpage = (Page) BufferGetPage(rbuffer);
332         GinInitBuffer(rbuffer, flags);
333
334         GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
335         GinPageGetOpaque(rpage)->rightlink = isRoot ? InvalidBlockNumber : data->rrlink;
336
337         /* Do the tree-type specific portion to restore the page contents */
338         if (isData)
339                 ginRedoSplitData(lpage, rpage, payload);
340         else
341                 ginRedoSplitEntry(lpage, rpage, payload);
342
343         PageSetLSN(rpage, lsn);
344         MarkBufferDirty(rbuffer);
345
346         PageSetLSN(lpage, lsn);
347         MarkBufferDirty(lbuffer);
348
349         if (isRoot)
350         {
351                 BlockNumber     rootBlkno = data->rrlink;
352                 Buffer          rootBuf = XLogReadBuffer(data->node, rootBlkno, true);
353                 Page            rootPage = BufferGetPage(rootBuf);
354
355                 GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
356
357                 if (isData)
358                 {
359                         Assert(rootBlkno != GIN_ROOT_BLKNO);
360                         ginDataFillRoot(NULL, BufferGetPage(rootBuf),
361                                                         BufferGetBlockNumber(lbuffer),
362                                                         BufferGetPage(lbuffer),
363                                                         BufferGetBlockNumber(rbuffer),
364                                                         BufferGetPage(rbuffer));
365                 }
366                 else
367                 {
368                         Assert(rootBlkno == GIN_ROOT_BLKNO);
369                         ginEntryFillRoot(NULL, BufferGetPage(rootBuf),
370                                                          BufferGetBlockNumber(lbuffer),
371                                                          BufferGetPage(lbuffer),
372                                                          BufferGetBlockNumber(rbuffer),
373                                                          BufferGetPage(rbuffer));
374                 }
375
376                 PageSetLSN(rootPage, lsn);
377
378                 MarkBufferDirty(rootBuf);
379                 UnlockReleaseBuffer(rootBuf);
380         }
381
382         UnlockReleaseBuffer(rbuffer);
383         UnlockReleaseBuffer(lbuffer);
384 }
385
386 static void
387 ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
388 {
389         ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
390         Buffer          buffer;
391         Page            page;
392
393         /* If we have a full-page image, restore it and we're done */
394         if (record->xl_info & XLR_BKP_BLOCK(0))
395         {
396                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
397                 return;
398         }
399
400         buffer = XLogReadBuffer(data->node, data->blkno, false);
401         if (!BufferIsValid(buffer))
402                 return;
403         page = (Page) BufferGetPage(buffer);
404
405         if (lsn > PageGetLSN(page))
406         {
407                 if (GinPageIsData(page))
408                 {
409                         memcpy(GinDataPageGetData(page),
410                                    XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
411                                    data->nitem * GinSizeOfDataPageItem(page));
412                         GinPageGetOpaque(page)->maxoff = data->nitem;
413                 }
414                 else
415                 {
416                         OffsetNumber i,
417                                            *tod;
418                         IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
419
420                         tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
421                         for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
422                                 tod[i - 1] = i;
423
424                         PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
425
426                         for (i = 0; i < data->nitem; i++)
427                         {
428                                 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
429                                         elog(ERROR, "failed to add item to index page in %u/%u/%u",
430                                                  data->node.spcNode, data->node.dbNode, data->node.relNode);
431                                 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
432                         }
433                 }
434
435                 PageSetLSN(page, lsn);
436                 MarkBufferDirty(buffer);
437         }
438
439         UnlockReleaseBuffer(buffer);
440 }
441
442 static void
443 ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
444 {
445         ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
446         Buffer          dbuffer;
447         Buffer          pbuffer;
448         Buffer          lbuffer;
449         Page            page;
450
451         if (record->xl_info & XLR_BKP_BLOCK(0))
452                 dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
453         else
454         {
455                 dbuffer = XLogReadBuffer(data->node, data->blkno, false);
456                 if (BufferIsValid(dbuffer))
457                 {
458                         page = BufferGetPage(dbuffer);
459                         if (lsn > PageGetLSN(page))
460                         {
461                                 Assert(GinPageIsData(page));
462                                 GinPageGetOpaque(page)->flags = GIN_DELETED;
463                                 PageSetLSN(page, lsn);
464                                 MarkBufferDirty(dbuffer);
465                         }
466                 }
467         }
468
469         if (record->xl_info & XLR_BKP_BLOCK(1))
470                 pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
471         else
472         {
473                 pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
474                 if (BufferIsValid(pbuffer))
475                 {
476                         page = BufferGetPage(pbuffer);
477                         if (lsn > PageGetLSN(page))
478                         {
479                                 Assert(GinPageIsData(page));
480                                 Assert(!GinPageIsLeaf(page));
481                                 GinPageDeletePostingItem(page, data->parentOffset);
482                                 PageSetLSN(page, lsn);
483                                 MarkBufferDirty(pbuffer);
484                         }
485                 }
486         }
487
488         if (record->xl_info & XLR_BKP_BLOCK(2))
489                 (void) RestoreBackupBlock(lsn, record, 2, false, false);
490         else if (data->leftBlkno != InvalidBlockNumber)
491         {
492                 lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
493                 if (BufferIsValid(lbuffer))
494                 {
495                         page = BufferGetPage(lbuffer);
496                         if (lsn > PageGetLSN(page))
497                         {
498                                 Assert(GinPageIsData(page));
499                                 GinPageGetOpaque(page)->rightlink = data->rightLink;
500                                 PageSetLSN(page, lsn);
501                                 MarkBufferDirty(lbuffer);
502                         }
503                         UnlockReleaseBuffer(lbuffer);
504                 }
505         }
506
507         if (BufferIsValid(pbuffer))
508                 UnlockReleaseBuffer(pbuffer);
509         if (BufferIsValid(dbuffer))
510                 UnlockReleaseBuffer(dbuffer);
511 }
512
513 static void
514 ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
515 {
516         ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
517         Buffer          metabuffer;
518         Page            metapage;
519         Buffer          buffer;
520
521         metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
522         if (!BufferIsValid(metabuffer))
523                 return;                                 /* assume index was deleted, nothing to do */
524         metapage = BufferGetPage(metabuffer);
525
526         if (lsn > PageGetLSN(metapage))
527         {
528                 memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
529                 PageSetLSN(metapage, lsn);
530                 MarkBufferDirty(metabuffer);
531         }
532
533         if (data->ntuples > 0)
534         {
535                 /*
536                  * insert into tail page
537                  */
538                 if (record->xl_info & XLR_BKP_BLOCK(0))
539                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
540                 else
541                 {
542                         buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
543                         if (BufferIsValid(buffer))
544                         {
545                                 Page            page = BufferGetPage(buffer);
546
547                                 if (lsn > PageGetLSN(page))
548                                 {
549                                         OffsetNumber l,
550                                                                 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
551                                         OffsetNumberNext(PageGetMaxOffsetNumber(page));
552                                         int                     i,
553                                                                 tupsize;
554                                         IndexTuple      tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
555
556                                         for (i = 0; i < data->ntuples; i++)
557                                         {
558                                                 tupsize = IndexTupleSize(tuples);
559
560                                                 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
561
562                                                 if (l == InvalidOffsetNumber)
563                                                         elog(ERROR, "failed to add item to index page");
564
565                                                 tuples = (IndexTuple) (((char *) tuples) + tupsize);
566
567                                                 off++;
568                                         }
569
570                                         /*
571                                          * Increase counter of heap tuples
572                                          */
573                                         GinPageGetOpaque(page)->maxoff++;
574
575                                         PageSetLSN(page, lsn);
576                                         MarkBufferDirty(buffer);
577                                 }
578                                 UnlockReleaseBuffer(buffer);
579                         }
580                 }
581         }
582         else if (data->prevTail != InvalidBlockNumber)
583         {
584                 /*
585                  * New tail
586                  */
587                 if (record->xl_info & XLR_BKP_BLOCK(0))
588                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
589                 else
590                 {
591                         buffer = XLogReadBuffer(data->node, data->prevTail, false);
592                         if (BufferIsValid(buffer))
593                         {
594                                 Page            page = BufferGetPage(buffer);
595
596                                 if (lsn > PageGetLSN(page))
597                                 {
598                                         GinPageGetOpaque(page)->rightlink = data->newRightlink;
599
600                                         PageSetLSN(page, lsn);
601                                         MarkBufferDirty(buffer);
602                                 }
603                                 UnlockReleaseBuffer(buffer);
604                         }
605                 }
606         }
607
608         UnlockReleaseBuffer(metabuffer);
609 }
610
611 static void
612 ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
613 {
614         ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record);
615         Buffer          buffer;
616         Page            page;
617         OffsetNumber l,
618                                 off = FirstOffsetNumber;
619         int                     i,
620                                 tupsize;
621         IndexTuple      tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
622
623         /* If we have a full-page image, restore it and we're done */
624         if (record->xl_info & XLR_BKP_BLOCK(0))
625         {
626                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
627                 return;
628         }
629
630         buffer = XLogReadBuffer(data->node, data->blkno, true);
631         Assert(BufferIsValid(buffer));
632         page = BufferGetPage(buffer);
633
634         GinInitBuffer(buffer, GIN_LIST);
635         GinPageGetOpaque(page)->rightlink = data->rightlink;
636         if (data->rightlink == InvalidBlockNumber)
637         {
638                 /* tail of sublist */
639                 GinPageSetFullRow(page);
640                 GinPageGetOpaque(page)->maxoff = 1;
641         }
642         else
643         {
644                 GinPageGetOpaque(page)->maxoff = 0;
645         }
646
647         for (i = 0; i < data->ntuples; i++)
648         {
649                 tupsize = IndexTupleSize(tuples);
650
651                 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
652
653                 if (l == InvalidOffsetNumber)
654                         elog(ERROR, "failed to add item to index page");
655
656                 tuples = (IndexTuple) (((char *) tuples) + tupsize);
657         }
658
659         PageSetLSN(page, lsn);
660         MarkBufferDirty(buffer);
661
662         UnlockReleaseBuffer(buffer);
663 }
664
665 static void
666 ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
667 {
668         ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record);
669         Buffer          metabuffer;
670         Page            metapage;
671         int                     i;
672
673         /* Backup blocks are not used in delete_listpage records */
674         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
675
676         metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
677         if (!BufferIsValid(metabuffer))
678                 return;                                 /* assume index was deleted, nothing to do */
679         metapage = BufferGetPage(metabuffer);
680
681         if (lsn > PageGetLSN(metapage))
682         {
683                 memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
684                 PageSetLSN(metapage, lsn);
685                 MarkBufferDirty(metabuffer);
686         }
687
688         /*
689          * In normal operation, shiftList() takes exclusive lock on all the
690          * pages-to-be-deleted simultaneously.  During replay, however, it should
691          * be all right to lock them one at a time.  This is dependent on the fact
692          * that we are deleting pages from the head of the list, and that readers
693          * share-lock the next page before releasing the one they are on. So we
694          * cannot get past a reader that is on, or due to visit, any page we are
695          * going to delete.  New incoming readers will block behind our metapage
696          * lock and then see a fully updated page list.
697          */
698         for (i = 0; i < data->ndeleted; i++)
699         {
700                 Buffer          buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
701
702                 if (BufferIsValid(buffer))
703                 {
704                         Page            page = BufferGetPage(buffer);
705
706                         if (lsn > PageGetLSN(page))
707                         {
708                                 GinPageGetOpaque(page)->flags = GIN_DELETED;
709
710                                 PageSetLSN(page, lsn);
711                                 MarkBufferDirty(buffer);
712                         }
713
714                         UnlockReleaseBuffer(buffer);
715                 }
716         }
717         UnlockReleaseBuffer(metabuffer);
718 }
719
720 void
721 gin_redo(XLogRecPtr lsn, XLogRecord *record)
722 {
723         uint8           info = record->xl_info & ~XLR_INFO_MASK;
724         MemoryContext oldCtx;
725
726         /*
727          * GIN indexes do not require any conflict processing. NB: If we ever
728          * implement a similar optimization as we have in b-tree, and remove
729          * killed tuples outside VACUUM, we'll need to handle that here.
730          */
731
732         oldCtx = MemoryContextSwitchTo(opCtx);
733         switch (info)
734         {
735                 case XLOG_GIN_CREATE_INDEX:
736                         ginRedoCreateIndex(lsn, record);
737                         break;
738                 case XLOG_GIN_CREATE_PTREE:
739                         ginRedoCreatePTree(lsn, record);
740                         break;
741                 case XLOG_GIN_INSERT:
742                         ginRedoInsert(lsn, record);
743                         break;
744                 case XLOG_GIN_SPLIT:
745                         ginRedoSplit(lsn, record);
746                         break;
747                 case XLOG_GIN_VACUUM_PAGE:
748                         ginRedoVacuumPage(lsn, record);
749                         break;
750                 case XLOG_GIN_DELETE_PAGE:
751                         ginRedoDeletePage(lsn, record);
752                         break;
753                 case XLOG_GIN_UPDATE_META_PAGE:
754                         ginRedoUpdateMetapage(lsn, record);
755                         break;
756                 case XLOG_GIN_INSERT_LISTPAGE:
757                         ginRedoInsertListPage(lsn, record);
758                         break;
759                 case XLOG_GIN_DELETE_LISTPAGE:
760                         ginRedoDeleteListPages(lsn, record);
761                         break;
762                 default:
763                         elog(PANIC, "gin_redo: unknown op code %u", info);
764         }
765         MemoryContextSwitchTo(oldCtx);
766         MemoryContextReset(opCtx);
767 }
768
769 void
770 gin_xlog_startup(void)
771 {
772         opCtx = AllocSetContextCreate(CurrentMemoryContext,
773                                                                   "GIN recovery temporary context",
774                                                                   ALLOCSET_DEFAULT_MINSIZE,
775                                                                   ALLOCSET_DEFAULT_INITSIZE,
776                                                                   ALLOCSET_DEFAULT_MAXSIZE);
777 }
778
779 void
780 gin_xlog_cleanup(void)
781 {
782         MemoryContextDelete(opCtx);
783 }