]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/ginxlog.c
Fix assorted bugs in GIN's WAL replay logic.
[postgresql] / src / backend / access / gin / ginxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginxlog.c
4  *        WAL replay logic for inverted index.
5  *
6  *
7  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                       src/backend/access/gin/ginxlog.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include "access/gin.h"
17 #include "access/xlogutils.h"
18 #include "storage/bufmgr.h"
19 #include "utils/memutils.h"
20
21 static MemoryContext opCtx;             /* working memory for operations */
22 static MemoryContext topCtx;
23
24 typedef struct ginIncompleteSplit
25 {
26         RelFileNode node;
27         BlockNumber leftBlkno;
28         BlockNumber rightBlkno;
29         BlockNumber rootBlkno;
30 } ginIncompleteSplit;
31
32 static List *incomplete_splits;
33
34 static void
35 pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno)
36 {
37         ginIncompleteSplit *split;
38
39         MemoryContextSwitchTo(topCtx);
40
41         split = palloc(sizeof(ginIncompleteSplit));
42
43         split->node = node;
44         split->leftBlkno = leftBlkno;
45         split->rightBlkno = rightBlkno;
46         split->rootBlkno = rootBlkno;
47
48         incomplete_splits = lappend(incomplete_splits, split);
49
50         MemoryContextSwitchTo(opCtx);
51 }
52
53 static void
54 forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
55 {
56         ListCell   *l;
57
58         foreach(l, incomplete_splits)
59         {
60                 ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
61
62                 if (RelFileNodeEquals(node, split->node) && leftBlkno == split->leftBlkno && updateBlkno == split->rightBlkno)
63                 {
64                         incomplete_splits = list_delete_ptr(incomplete_splits, split);
65                         break;
66                 }
67         }
68 }
69
70 static void
71 ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
72 {
73         RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
74         Buffer          RootBuffer,
75                                 MetaBuffer;
76         Page            page;
77
78         MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
79         Assert(BufferIsValid(MetaBuffer));
80         page = (Page) BufferGetPage(MetaBuffer);
81
82         GinInitMetabuffer(MetaBuffer);
83
84         PageSetLSN(page, lsn);
85         PageSetTLI(page, ThisTimeLineID);
86         MarkBufferDirty(MetaBuffer);
87
88         RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
89         Assert(BufferIsValid(RootBuffer));
90         page = (Page) BufferGetPage(RootBuffer);
91
92         GinInitBuffer(RootBuffer, GIN_LEAF);
93
94         PageSetLSN(page, lsn);
95         PageSetTLI(page, ThisTimeLineID);
96         MarkBufferDirty(RootBuffer);
97
98         UnlockReleaseBuffer(RootBuffer);
99         UnlockReleaseBuffer(MetaBuffer);
100 }
101
102 static void
103 ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
104 {
105         ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
106         ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
107         Buffer          buffer;
108         Page            page;
109
110         buffer = XLogReadBuffer(data->node, data->blkno, true);
111         Assert(BufferIsValid(buffer));
112         page = (Page) BufferGetPage(buffer);
113
114         GinInitBuffer(buffer, GIN_DATA | GIN_LEAF);
115         memcpy(GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem);
116         GinPageGetOpaque(page)->maxoff = data->nitem;
117
118         PageSetLSN(page, lsn);
119         PageSetTLI(page, ThisTimeLineID);
120
121         MarkBufferDirty(buffer);
122         UnlockReleaseBuffer(buffer);
123 }
124
125 static void
126 ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
127 {
128         ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
129         Buffer          buffer;
130         Page            page;
131
132         /* first, forget any incomplete split this insertion completes */
133         if (data->isData)
134         {
135                 Assert(data->isDelete == FALSE);
136                 if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
137                 {
138                         PostingItem *pitem;
139
140                         pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
141                         forgetIncompleteSplit(data->node,
142                                                                   PostingItemGetBlockNumber(pitem),
143                                                                   data->updateBlkno);
144                 }
145
146         }
147         else
148         {
149                 if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
150                 {
151                         IndexTuple      itup;
152
153                         itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
154                         forgetIncompleteSplit(data->node,
155                                                                   GinItemPointerGetBlockNumber(&itup->t_tid),
156                                                                   data->updateBlkno);
157                 }
158         }
159
160         /* nothing else to do if page was backed up */
161         if (record->xl_info & XLR_BKP_BLOCK_1)
162                 return;
163
164         buffer = XLogReadBuffer(data->node, data->blkno, false);
165         if (!BufferIsValid(buffer))
166                 return;                                 /* page was deleted, nothing to do */
167         page = (Page) BufferGetPage(buffer);
168
169         if (!XLByteLE(lsn, PageGetLSN(page)))
170         {
171                 if (data->isData)
172                 {
173                         Assert(GinPageIsData(page));
174
175                         if (data->isLeaf)
176                         {
177                                 OffsetNumber i;
178                                 ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
179
180                                 Assert(GinPageIsLeaf(page));
181                                 Assert(data->updateBlkno == InvalidBlockNumber);
182
183                                 for (i = 0; i < data->nitem; i++)
184                                         GinDataPageAddItem(page, items + i, data->offset + i);
185                         }
186                         else
187                         {
188                                 PostingItem *pitem;
189
190                                 Assert(!GinPageIsLeaf(page));
191
192                                 if (data->updateBlkno != InvalidBlockNumber)
193                                 {
194                                         /* update link to right page after split */
195                                         pitem = (PostingItem *) GinDataPageGetItem(page, data->offset);
196                                         PostingItemSetBlockNumber(pitem, data->updateBlkno);
197                                 }
198
199                                 pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
200
201                                 GinDataPageAddItem(page, pitem, data->offset);
202                         }
203                 }
204                 else
205                 {
206                         IndexTuple      itup;
207
208                         Assert(!GinPageIsData(page));
209
210                         if (data->updateBlkno != InvalidBlockNumber)
211                         {
212                                 /* update link to right page after split */
213                                 Assert(!GinPageIsLeaf(page));
214                                 Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
215                                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
216                                 ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber);
217                         }
218
219                         if (data->isDelete)
220                         {
221                                 Assert(GinPageIsLeaf(page));
222                                 Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
223                                 PageIndexTupleDelete(page, data->offset);
224                         }
225
226                         itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
227
228                         if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
229                                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
230                                   data->node.spcNode, data->node.dbNode, data->node.relNode);
231                 }
232
233                 PageSetLSN(page, lsn);
234                 PageSetTLI(page, ThisTimeLineID);
235
236                 MarkBufferDirty(buffer);
237         }
238
239         UnlockReleaseBuffer(buffer);
240 }
241
242 static void
243 ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
244 {
245         ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
246         Buffer          lbuffer,
247                                 rbuffer;
248         Page            lpage,
249                                 rpage;
250         uint32          flags = 0;
251
252         if (data->isLeaf)
253                 flags |= GIN_LEAF;
254         if (data->isData)
255                 flags |= GIN_DATA;
256
257         lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
258         Assert(BufferIsValid(lbuffer));
259         lpage = (Page) BufferGetPage(lbuffer);
260         GinInitBuffer(lbuffer, flags);
261
262         rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
263         Assert(BufferIsValid(rbuffer));
264         rpage = (Page) BufferGetPage(rbuffer);
265         GinInitBuffer(rbuffer, flags);
266
267         GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
268         GinPageGetOpaque(rpage)->rightlink = data->rrlink;
269
270         if (data->isData)
271         {
272                 char       *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
273                 Size            sizeofitem = GinSizeOfItem(lpage);
274                 OffsetNumber i;
275                 ItemPointer bound;
276
277                 for (i = 0; i < data->separator; i++)
278                 {
279                         GinDataPageAddItem(lpage, ptr, InvalidOffsetNumber);
280                         ptr += sizeofitem;
281                 }
282
283                 for (i = data->separator; i < data->nitem; i++)
284                 {
285                         GinDataPageAddItem(rpage, ptr, InvalidOffsetNumber);
286                         ptr += sizeofitem;
287                 }
288
289                 /* set up right key */
290                 bound = GinDataPageGetRightBound(lpage);
291                 if (data->isLeaf)
292                         *bound = *(ItemPointerData *) GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff);
293                 else
294                         *bound = ((PostingItem *) GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff))->key;
295
296                 bound = GinDataPageGetRightBound(rpage);
297                 *bound = data->rightbound;
298         }
299         else
300         {
301                 IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogSplit));
302                 OffsetNumber i;
303
304                 for (i = 0; i < data->separator; i++)
305                 {
306                         if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
307                                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
308                                   data->node.spcNode, data->node.dbNode, data->node.relNode);
309                         itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
310                 }
311
312                 for (i = data->separator; i < data->nitem; i++)
313                 {
314                         if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
315                                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
316                                   data->node.spcNode, data->node.dbNode, data->node.relNode);
317                         itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
318                 }
319         }
320
321         PageSetLSN(rpage, lsn);
322         PageSetTLI(rpage, ThisTimeLineID);
323         MarkBufferDirty(rbuffer);
324
325         PageSetLSN(lpage, lsn);
326         PageSetTLI(lpage, ThisTimeLineID);
327         MarkBufferDirty(lbuffer);
328
329         if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
330                 forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
331
332         if (data->isRootSplit)
333         {
334                 Buffer          rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
335                 Page            rootPage = BufferGetPage(rootBuf);
336
337                 GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
338
339                 if (data->isData)
340                 {
341                         Assert(data->rootBlkno != GIN_ROOT_BLKNO);
342                         dataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
343                 }
344                 else
345                 {
346                         Assert(data->rootBlkno == GIN_ROOT_BLKNO);
347                         entryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
348                 }
349
350                 PageSetLSN(rootPage, lsn);
351                 PageSetTLI(rootPage, ThisTimeLineID);
352
353                 MarkBufferDirty(rootBuf);
354                 UnlockReleaseBuffer(rootBuf);
355         }
356         else
357                 pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
358
359         UnlockReleaseBuffer(rbuffer);
360         UnlockReleaseBuffer(lbuffer);
361 }
362
363 static void
364 ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
365 {
366         ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
367         Buffer          buffer;
368         Page            page;
369
370         /* nothing to do if page was backed up (and no info to do it with) */
371         if (record->xl_info & XLR_BKP_BLOCK_1)
372                 return;
373
374         buffer = XLogReadBuffer(data->node, data->blkno, false);
375         if (!BufferIsValid(buffer))
376                 return;
377         page = (Page) BufferGetPage(buffer);
378
379         if (!XLByteLE(lsn, PageGetLSN(page)))
380         {
381                 if (GinPageIsData(page))
382                 {
383                         memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
384                                    GinSizeOfItem(page) *data->nitem);
385                         GinPageGetOpaque(page)->maxoff = data->nitem;
386                 }
387                 else
388                 {
389                         OffsetNumber i,
390                                 *tod;
391                         IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
392
393                         tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
394                         for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
395                                 tod[i - 1] = i;
396
397                         PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
398
399                         for (i = 0; i < data->nitem; i++)
400                         {
401                                 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
402                                         elog(ERROR, "failed to add item to index page in %u/%u/%u",
403                                                  data->node.spcNode, data->node.dbNode, data->node.relNode);
404                                 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
405                         }
406                 }
407
408                 PageSetLSN(page, lsn);
409                 PageSetTLI(page, ThisTimeLineID);
410                 MarkBufferDirty(buffer);
411         }
412
413         UnlockReleaseBuffer(buffer);
414 }
415
416 static void
417 ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
418 {
419         ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
420         Buffer          buffer;
421         Page            page;
422
423         if (!(record->xl_info & XLR_BKP_BLOCK_1))
424         {
425                 buffer = XLogReadBuffer(data->node, data->blkno, false);
426                 if (BufferIsValid(buffer))
427                 {
428                         page = BufferGetPage(buffer);
429                         if (!XLByteLE(lsn, PageGetLSN(page)))
430                         {
431                                 Assert(GinPageIsData(page));
432                                 GinPageGetOpaque(page)->flags = GIN_DELETED;
433                                 PageSetLSN(page, lsn);
434                                 PageSetTLI(page, ThisTimeLineID);
435                                 MarkBufferDirty(buffer);
436                         }
437                         UnlockReleaseBuffer(buffer);
438                 }
439         }
440
441         if (!(record->xl_info & XLR_BKP_BLOCK_2))
442         {
443                 buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
444                 if (BufferIsValid(buffer))
445                 {
446                         page = BufferGetPage(buffer);
447                         if (!XLByteLE(lsn, PageGetLSN(page)))
448                         {
449                                 Assert(GinPageIsData(page));
450                                 Assert(!GinPageIsLeaf(page));
451                                 PageDeletePostingItem(page, data->parentOffset);
452                                 PageSetLSN(page, lsn);
453                                 PageSetTLI(page, ThisTimeLineID);
454                                 MarkBufferDirty(buffer);
455                         }
456                         UnlockReleaseBuffer(buffer);
457                 }
458         }
459
460         if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
461         {
462                 buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
463                 if (BufferIsValid(buffer))
464                 {
465                         page = BufferGetPage(buffer);
466                         if (!XLByteLE(lsn, PageGetLSN(page)))
467                         {
468                                 Assert(GinPageIsData(page));
469                                 GinPageGetOpaque(page)->rightlink = data->rightLink;
470                                 PageSetLSN(page, lsn);
471                                 PageSetTLI(page, ThisTimeLineID);
472                                 MarkBufferDirty(buffer);
473                         }
474                         UnlockReleaseBuffer(buffer);
475                 }
476         }
477 }
478
479 static void
480 ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
481 {
482         ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
483         Buffer          metabuffer;
484         Page            metapage;
485         Buffer          buffer;
486
487         metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
488         if (!BufferIsValid(metabuffer))
489                 elog(PANIC, "GIN metapage disappeared");
490         metapage = BufferGetPage(metabuffer);
491
492         if (!XLByteLE(lsn, PageGetLSN(metapage)))
493         {
494                 memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
495                 PageSetLSN(metapage, lsn);
496                 PageSetTLI(metapage, ThisTimeLineID);
497                 MarkBufferDirty(metabuffer);
498         }
499
500         if (data->ntuples > 0)
501         {
502                 /*
503                  * insert into tail page
504                  */
505                 if (!(record->xl_info & XLR_BKP_BLOCK_1))
506                 {
507                         buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
508                         if (BufferIsValid(buffer))
509                         {
510                                 Page            page = BufferGetPage(buffer);
511
512                                 if (!XLByteLE(lsn, PageGetLSN(page)))
513                                 {
514                                         OffsetNumber l,
515                                                 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
516                                                 OffsetNumberNext(PageGetMaxOffsetNumber(page));
517                                         int                     i,
518                                                 tupsize;
519                                         IndexTuple      tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
520
521                                         for (i = 0; i < data->ntuples; i++)
522                                         {
523                                                 tupsize = IndexTupleSize(tuples);
524
525                                                 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
526
527                                                 if (l == InvalidOffsetNumber)
528                                                         elog(ERROR, "failed to add item to index page");
529
530                                                 tuples = (IndexTuple) (((char *) tuples) + tupsize);
531                                         }
532
533                                         /*
534                                          * Increase counter of heap tuples
535                                          */
536                                         GinPageGetOpaque(page)->maxoff++;
537
538                                         PageSetLSN(page, lsn);
539                                         PageSetTLI(page, ThisTimeLineID);
540                                         MarkBufferDirty(buffer);
541                                 }
542                                 UnlockReleaseBuffer(buffer);
543                         }
544                 }
545         }
546         else if (data->prevTail != InvalidBlockNumber)
547         {
548                 /*
549                  * New tail
550                  */
551                 buffer = XLogReadBuffer(data->node, data->prevTail, false);
552                 if (BufferIsValid(buffer))
553                 {
554                         Page            page = BufferGetPage(buffer);
555
556                         if (!XLByteLE(lsn, PageGetLSN(page)))
557                         {
558                                 GinPageGetOpaque(page)->rightlink = data->newRightlink;
559
560                                 PageSetLSN(page, lsn);
561                                 PageSetTLI(page, ThisTimeLineID);
562                                 MarkBufferDirty(buffer);
563                         }
564                         UnlockReleaseBuffer(buffer);
565                 }
566         }
567
568         UnlockReleaseBuffer(metabuffer);
569 }
570
571 static void
572 ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
573 {
574         ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record);
575         Buffer          buffer;
576         Page            page;
577         OffsetNumber l,
578                                 off = FirstOffsetNumber;
579         int                     i,
580                                 tupsize;
581         IndexTuple      tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
582
583         if (record->xl_info & XLR_BKP_BLOCK_1)
584                 return;
585
586         buffer = XLogReadBuffer(data->node, data->blkno, true);
587         Assert(BufferIsValid(buffer));
588         page = BufferGetPage(buffer);
589
590         GinInitBuffer(buffer, GIN_LIST);
591         GinPageGetOpaque(page)->rightlink = data->rightlink;
592         if (data->rightlink == InvalidBlockNumber)
593         {
594                 /* tail of sublist */
595                 GinPageSetFullRow(page);
596                 GinPageGetOpaque(page)->maxoff = 1;
597         }
598         else
599         {
600                 GinPageGetOpaque(page)->maxoff = 0;
601         }
602
603         for (i = 0; i < data->ntuples; i++)
604         {
605                 tupsize = IndexTupleSize(tuples);
606
607                 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
608
609                 if (l == InvalidOffsetNumber)
610                         elog(ERROR, "failed to add item to index page");
611
612                 tuples = (IndexTuple) (((char *) tuples) + tupsize);
613         }
614
615         PageSetLSN(page, lsn);
616         PageSetTLI(page, ThisTimeLineID);
617         MarkBufferDirty(buffer);
618
619         UnlockReleaseBuffer(buffer);
620 }
621
622 static void
623 ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
624 {
625         ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record);
626         Buffer          metabuffer;
627         Page            metapage;
628         int                     i;
629
630         metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
631         if (!BufferIsValid(metabuffer))
632                 elog(PANIC, "GIN metapage disappeared");
633         metapage = BufferGetPage(metabuffer);
634
635         if (!XLByteLE(lsn, PageGetLSN(metapage)))
636         {
637                 memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
638                 PageSetLSN(metapage, lsn);
639                 PageSetTLI(metapage, ThisTimeLineID);
640                 MarkBufferDirty(metabuffer);
641         }
642
643         for (i = 0; i < data->ndeleted; i++)
644         {
645                 Buffer          buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
646
647                 if (BufferIsValid(buffer))
648                 {
649                         Page            page = BufferGetPage(buffer);
650
651                         if (!XLByteLE(lsn, PageGetLSN(page)))
652                         {
653                                 GinPageGetOpaque(page)->flags = GIN_DELETED;
654
655                                 PageSetLSN(page, lsn);
656                                 PageSetTLI(page, ThisTimeLineID);
657                                 MarkBufferDirty(buffer);
658                         }
659
660                         UnlockReleaseBuffer(buffer);
661                 }
662         }
663         UnlockReleaseBuffer(metabuffer);
664 }
665
666 void
667 gin_redo(XLogRecPtr lsn, XLogRecord *record)
668 {
669         uint8           info = record->xl_info & ~XLR_INFO_MASK;
670
671         /*
672          * GIN indexes do not require any conflict processing.
673          */
674
675         RestoreBkpBlocks(lsn, record, false);
676
677         topCtx = MemoryContextSwitchTo(opCtx);
678         switch (info)
679         {
680                 case XLOG_GIN_CREATE_INDEX:
681                         ginRedoCreateIndex(lsn, record);
682                         break;
683                 case XLOG_GIN_CREATE_PTREE:
684                         ginRedoCreatePTree(lsn, record);
685                         break;
686                 case XLOG_GIN_INSERT:
687                         ginRedoInsert(lsn, record);
688                         break;
689                 case XLOG_GIN_SPLIT:
690                         ginRedoSplit(lsn, record);
691                         break;
692                 case XLOG_GIN_VACUUM_PAGE:
693                         ginRedoVacuumPage(lsn, record);
694                         break;
695                 case XLOG_GIN_DELETE_PAGE:
696                         ginRedoDeletePage(lsn, record);
697                         break;
698                 case XLOG_GIN_UPDATE_META_PAGE:
699                         ginRedoUpdateMetapage(lsn, record);
700                         break;
701                 case XLOG_GIN_INSERT_LISTPAGE:
702                         ginRedoInsertListPage(lsn, record);
703                         break;
704                 case XLOG_GIN_DELETE_LISTPAGE:
705                         ginRedoDeleteListPages(lsn, record);
706                         break;
707                 default:
708                         elog(PANIC, "gin_redo: unknown op code %u", info);
709         }
710         MemoryContextSwitchTo(topCtx);
711         MemoryContextReset(opCtx);
712 }
713
714 static void
715 desc_node(StringInfo buf, RelFileNode node, BlockNumber blkno)
716 {
717         appendStringInfo(buf, "node: %u/%u/%u blkno: %u",
718                                          node.spcNode, node.dbNode, node.relNode, blkno);
719 }
720
721 void
722 gin_desc(StringInfo buf, uint8 xl_info, char *rec)
723 {
724         uint8           info = xl_info & ~XLR_INFO_MASK;
725
726         switch (info)
727         {
728                 case XLOG_GIN_CREATE_INDEX:
729                         appendStringInfo(buf, "Create index, ");
730                         desc_node(buf, *(RelFileNode *) rec, GIN_ROOT_BLKNO);
731                         break;
732                 case XLOG_GIN_CREATE_PTREE:
733                         appendStringInfo(buf, "Create posting tree, ");
734                         desc_node(buf, ((ginxlogCreatePostingTree *) rec)->node, ((ginxlogCreatePostingTree *) rec)->blkno);
735                         break;
736                 case XLOG_GIN_INSERT:
737                         appendStringInfo(buf, "Insert item, ");
738                         desc_node(buf, ((ginxlogInsert *) rec)->node, ((ginxlogInsert *) rec)->blkno);
739                         appendStringInfo(buf, " offset: %u nitem: %u isdata: %c isleaf %c isdelete %c updateBlkno:%u",
740                                                          ((ginxlogInsert *) rec)->offset,
741                                                          ((ginxlogInsert *) rec)->nitem,
742                                                          (((ginxlogInsert *) rec)->isData) ? 'T' : 'F',
743                                                          (((ginxlogInsert *) rec)->isLeaf) ? 'T' : 'F',
744                                                          (((ginxlogInsert *) rec)->isDelete) ? 'T' : 'F',
745                                                          ((ginxlogInsert *) rec)->updateBlkno
746                                 );
747
748                         break;
749                 case XLOG_GIN_SPLIT:
750                         appendStringInfo(buf, "Page split, ");
751                         desc_node(buf, ((ginxlogSplit *) rec)->node, ((ginxlogSplit *) rec)->lblkno);
752                         appendStringInfo(buf, " isrootsplit: %c", (((ginxlogSplit *) rec)->isRootSplit) ? 'T' : 'F');
753                         break;
754                 case XLOG_GIN_VACUUM_PAGE:
755                         appendStringInfo(buf, "Vacuum page, ");
756                         desc_node(buf, ((ginxlogVacuumPage *) rec)->node, ((ginxlogVacuumPage *) rec)->blkno);
757                         break;
758                 case XLOG_GIN_DELETE_PAGE:
759                         appendStringInfo(buf, "Delete page, ");
760                         desc_node(buf, ((ginxlogDeletePage *) rec)->node, ((ginxlogDeletePage *) rec)->blkno);
761                         break;
762                 case XLOG_GIN_UPDATE_META_PAGE:
763                         appendStringInfo(buf, "Update metapage, ");
764                         desc_node(buf, ((ginxlogUpdateMeta *) rec)->node, ((ginxlogUpdateMeta *) rec)->metadata.tail);
765                         break;
766                 case XLOG_GIN_INSERT_LISTPAGE:
767                         appendStringInfo(buf, "Insert new list page, ");
768                         desc_node(buf, ((ginxlogInsertListPage *) rec)->node, ((ginxlogInsertListPage *) rec)->blkno);
769                         break;
770                 case XLOG_GIN_DELETE_LISTPAGE:
771                         appendStringInfo(buf, "Delete list pages (%d), ", ((ginxlogDeleteListPages *) rec)->ndeleted);
772                         desc_node(buf, ((ginxlogDeleteListPages *) rec)->node, ((ginxlogDeleteListPages *) rec)->metadata.head);
773                         break;
774                 default:
775                         elog(PANIC, "gin_desc: unknown op code %u", info);
776         }
777 }
778
779 void
780 gin_xlog_startup(void)
781 {
782         incomplete_splits = NIL;
783
784         opCtx = AllocSetContextCreate(CurrentMemoryContext,
785                                                                   "GIN recovery temporary context",
786                                                                   ALLOCSET_DEFAULT_MINSIZE,
787                                                                   ALLOCSET_DEFAULT_INITSIZE,
788                                                                   ALLOCSET_DEFAULT_MAXSIZE);
789 }
790
791 static void
792 ginContinueSplit(ginIncompleteSplit *split)
793 {
794         GinBtreeData btree;
795         Relation        reln;
796         Buffer          buffer;
797         GinBtreeStack stack;
798
799         /*
800          * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u",  split->rootBlkno,
801          * split->leftBlkno, split->rightBlkno);
802          */
803         buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
804
805         /*
806          * Failure should be impossible here, because we wrote the page earlier.
807          */
808         if (!BufferIsValid(buffer))
809                 elog(PANIC, "ginContinueSplit: left block %u not found",
810                          split->leftBlkno);
811
812         reln = CreateFakeRelcacheEntry(split->node);
813
814         if (split->rootBlkno == GIN_ROOT_BLKNO)
815         {
816                 prepareEntryScan(&btree, reln, InvalidOffsetNumber, (Datum) 0, NULL);
817                 btree.entry = ginPageGetLinkItup(buffer);
818         }
819         else
820         {
821                 Page            page = BufferGetPage(buffer);
822
823                 prepareDataScan(&btree, reln);
824
825                 PostingItemSetBlockNumber(&(btree.pitem), split->leftBlkno);
826                 if (GinPageIsLeaf(page))
827                         btree.pitem.key = *(ItemPointerData *) GinDataPageGetItem(page,
828                                                                                          GinPageGetOpaque(page)->maxoff);
829                 else
830                         btree.pitem.key = ((PostingItem *) GinDataPageGetItem(page,
831                                                                            GinPageGetOpaque(page)->maxoff))->key;
832         }
833
834         btree.rightblkno = split->rightBlkno;
835
836         stack.blkno = split->leftBlkno;
837         stack.buffer = buffer;
838         stack.off = InvalidOffsetNumber;
839         stack.parent = NULL;
840
841         findParents(&btree, &stack, split->rootBlkno);
842         ginInsertValue(&btree, stack.parent);
843
844         FreeFakeRelcacheEntry(reln);
845
846         UnlockReleaseBuffer(buffer);
847 }
848
849 void
850 gin_xlog_cleanup(void)
851 {
852         ListCell   *l;
853         MemoryContext topCtx;
854
855         topCtx = MemoryContextSwitchTo(opCtx);
856
857         foreach(l, incomplete_splits)
858         {
859                 ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
860
861                 ginContinueSplit(split);
862                 MemoryContextReset(opCtx);
863         }
864
865         MemoryContextSwitchTo(topCtx);
866         MemoryContextDelete(opCtx);
867         incomplete_splits = NIL;
868 }
869
870 bool
871 gin_safe_restartpoint(void)
872 {
873         if (incomplete_splits)
874                 return false;
875         return true;
876 }