]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/ginxlog.c
Refactor the internal GIN B-tree interface for forming a downlink.
[postgresql] / src / backend / access / gin / ginxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginxlog.c
4  *        WAL replay logic for inverted index.
5  *
6  *
7  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                       src/backend/access/gin/ginxlog.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include "access/gin_private.h"
17 #include "access/xlogutils.h"
18 #include "utils/memutils.h"
19
20 static MemoryContext opCtx;             /* working memory for operations */
21 static MemoryContext topCtx;
22
23 typedef struct ginIncompleteSplit
24 {
25         RelFileNode node;
26         BlockNumber leftBlkno;
27         BlockNumber rightBlkno;
28         BlockNumber rootBlkno;
29 } ginIncompleteSplit;
30
31 static List *incomplete_splits;
32
33 static void
34 pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno)
35 {
36         ginIncompleteSplit *split;
37
38         MemoryContextSwitchTo(topCtx);
39
40         split = palloc(sizeof(ginIncompleteSplit));
41
42         split->node = node;
43         split->leftBlkno = leftBlkno;
44         split->rightBlkno = rightBlkno;
45         split->rootBlkno = rootBlkno;
46
47         incomplete_splits = lappend(incomplete_splits, split);
48
49         MemoryContextSwitchTo(opCtx);
50 }
51
52 static void
53 forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
54 {
55         ListCell   *l;
56
57         foreach(l, incomplete_splits)
58         {
59                 ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
60
61                 if (RelFileNodeEquals(node, split->node) &&
62                         leftBlkno == split->leftBlkno &&
63                         updateBlkno == split->rightBlkno)
64                 {
65                         incomplete_splits = list_delete_ptr(incomplete_splits, split);
66                         pfree(split);
67                         break;
68                 }
69         }
70 }
71
72 static void
73 ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
74 {
75         RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
76         Buffer          RootBuffer,
77                                 MetaBuffer;
78         Page            page;
79
80         /* Backup blocks are not used in create_index records */
81         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
82
83         MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
84         Assert(BufferIsValid(MetaBuffer));
85         page = (Page) BufferGetPage(MetaBuffer);
86
87         GinInitMetabuffer(MetaBuffer);
88
89         PageSetLSN(page, lsn);
90         MarkBufferDirty(MetaBuffer);
91
92         RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
93         Assert(BufferIsValid(RootBuffer));
94         page = (Page) BufferGetPage(RootBuffer);
95
96         GinInitBuffer(RootBuffer, GIN_LEAF);
97
98         PageSetLSN(page, lsn);
99         MarkBufferDirty(RootBuffer);
100
101         UnlockReleaseBuffer(RootBuffer);
102         UnlockReleaseBuffer(MetaBuffer);
103 }
104
105 static void
106 ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
107 {
108         ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
109         ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
110         Buffer          buffer;
111         Page            page;
112
113         /* Backup blocks are not used in create_ptree records */
114         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
115
116         buffer = XLogReadBuffer(data->node, data->blkno, true);
117         Assert(BufferIsValid(buffer));
118         page = (Page) BufferGetPage(buffer);
119
120         GinInitBuffer(buffer, GIN_DATA | GIN_LEAF);
121         memcpy(GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem);
122         GinPageGetOpaque(page)->maxoff = data->nitem;
123
124         PageSetLSN(page, lsn);
125
126         MarkBufferDirty(buffer);
127         UnlockReleaseBuffer(buffer);
128 }
129
130 static void
131 ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
132 {
133         ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
134         Buffer          buffer;
135         Page            page;
136
137         /* first, forget any incomplete split this insertion completes */
138         if (data->isData)
139         {
140                 Assert(data->isDelete == FALSE);
141                 if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
142                 {
143                         PostingItem *pitem;
144
145                         pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
146                         forgetIncompleteSplit(data->node,
147                                                                   PostingItemGetBlockNumber(pitem),
148                                                                   data->updateBlkno);
149                 }
150
151         }
152         else
153         {
154                 if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
155                 {
156                         IndexTuple      itup;
157
158                         itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
159                         forgetIncompleteSplit(data->node,
160                                                                   GinGetDownlink(itup),
161                                                                   data->updateBlkno);
162                 }
163         }
164
165         /* If we have a full-page image, restore it and we're done */
166         if (record->xl_info & XLR_BKP_BLOCK(0))
167         {
168                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
169                 return;
170         }
171
172         buffer = XLogReadBuffer(data->node, data->blkno, false);
173         if (!BufferIsValid(buffer))
174                 return;                                 /* page was deleted, nothing to do */
175         page = (Page) BufferGetPage(buffer);
176
177         if (lsn > PageGetLSN(page))
178         {
179                 if (data->isData)
180                 {
181                         Assert(GinPageIsData(page));
182
183                         if (data->isLeaf)
184                         {
185                                 OffsetNumber i;
186                                 ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
187
188                                 Assert(GinPageIsLeaf(page));
189                                 Assert(data->updateBlkno == InvalidBlockNumber);
190
191                                 for (i = 0; i < data->nitem; i++)
192                                         GinDataPageAddItemPointer(page, &items[i], data->offset + i);
193                         }
194                         else
195                         {
196                                 PostingItem *pitem;
197
198                                 Assert(!GinPageIsLeaf(page));
199
200                                 if (data->updateBlkno != InvalidBlockNumber)
201                                 {
202                                         /* update link to right page after split */
203                                         pitem = GinDataPageGetPostingItem(page, data->offset);
204                                         PostingItemSetBlockNumber(pitem, data->updateBlkno);
205                                 }
206
207                                 pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
208
209                                 GinDataPageAddPostingItem(page, pitem, data->offset);
210                         }
211                 }
212                 else
213                 {
214                         IndexTuple      itup;
215
216                         Assert(!GinPageIsData(page));
217
218                         if (data->updateBlkno != InvalidBlockNumber)
219                         {
220                                 /* update link to right page after split */
221                                 Assert(!GinPageIsLeaf(page));
222                                 Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
223                                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
224                                 GinSetDownlink(itup, data->updateBlkno);
225                         }
226
227                         if (data->isDelete)
228                         {
229                                 Assert(GinPageIsLeaf(page));
230                                 Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
231                                 PageIndexTupleDelete(page, data->offset);
232                         }
233
234                         itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
235
236                         if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
237                                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
238                                   data->node.spcNode, data->node.dbNode, data->node.relNode);
239                 }
240
241                 PageSetLSN(page, lsn);
242
243                 MarkBufferDirty(buffer);
244         }
245
246         UnlockReleaseBuffer(buffer);
247 }
248
249 static void
250 ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
251 {
252         ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
253         Buffer          lbuffer,
254                                 rbuffer;
255         Page            lpage,
256                                 rpage;
257         uint32          flags = 0;
258
259         if (data->isLeaf)
260                 flags |= GIN_LEAF;
261         if (data->isData)
262                 flags |= GIN_DATA;
263
264         /* Backup blocks are not used in split records */
265         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
266
267         lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
268         Assert(BufferIsValid(lbuffer));
269         lpage = (Page) BufferGetPage(lbuffer);
270         GinInitBuffer(lbuffer, flags);
271
272         rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
273         Assert(BufferIsValid(rbuffer));
274         rpage = (Page) BufferGetPage(rbuffer);
275         GinInitBuffer(rbuffer, flags);
276
277         GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
278         GinPageGetOpaque(rpage)->rightlink = data->rrlink;
279
280         if (data->isData)
281         {
282                 char       *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
283                 Size            sizeofitem = GinSizeOfDataPageItem(lpage);
284                 OffsetNumber i;
285                 ItemPointer bound;
286
287                 for (i = 0; i < data->separator; i++)
288                 {
289                         if (data->isLeaf)
290                                 GinDataPageAddItemPointer(lpage, (ItemPointer) ptr, InvalidOffsetNumber);
291                         else
292                                 GinDataPageAddPostingItem(lpage, (PostingItem *) ptr, InvalidOffsetNumber);
293                         ptr += sizeofitem;
294                 }
295
296                 for (i = data->separator; i < data->nitem; i++)
297                 {
298                         if (data->isLeaf)
299                                 GinDataPageAddItemPointer(rpage, (ItemPointer) ptr, InvalidOffsetNumber);
300                         else
301                                 GinDataPageAddPostingItem(rpage, (PostingItem *) ptr, InvalidOffsetNumber);
302                         ptr += sizeofitem;
303                 }
304
305                 /* set up right key */
306                 bound = GinDataPageGetRightBound(lpage);
307                 if (data->isLeaf)
308                         *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
309                 else
310                         *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
311
312                 bound = GinDataPageGetRightBound(rpage);
313                 *bound = data->rightbound;
314         }
315         else
316         {
317                 IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogSplit));
318                 OffsetNumber i;
319
320                 for (i = 0; i < data->separator; i++)
321                 {
322                         if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
323                                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
324                                   data->node.spcNode, data->node.dbNode, data->node.relNode);
325                         itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
326                 }
327
328                 for (i = data->separator; i < data->nitem; i++)
329                 {
330                         if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
331                                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
332                                   data->node.spcNode, data->node.dbNode, data->node.relNode);
333                         itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
334                 }
335         }
336
337         PageSetLSN(rpage, lsn);
338         MarkBufferDirty(rbuffer);
339
340         PageSetLSN(lpage, lsn);
341         MarkBufferDirty(lbuffer);
342
343         if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
344                 forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
345
346         if (data->isRootSplit)
347         {
348                 Buffer          rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
349                 Page            rootPage = BufferGetPage(rootBuf);
350
351                 GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
352
353                 if (data->isData)
354                 {
355                         Assert(data->rootBlkno != GIN_ROOT_BLKNO);
356                         ginDataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
357                 }
358                 else
359                 {
360                         Assert(data->rootBlkno == GIN_ROOT_BLKNO);
361                         ginEntryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
362                 }
363
364                 PageSetLSN(rootPage, lsn);
365
366                 MarkBufferDirty(rootBuf);
367                 UnlockReleaseBuffer(rootBuf);
368         }
369         else
370                 pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
371
372         UnlockReleaseBuffer(rbuffer);
373         UnlockReleaseBuffer(lbuffer);
374 }
375
376 static void
377 ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
378 {
379         ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
380         Buffer          buffer;
381         Page            page;
382
383         /* If we have a full-page image, restore it and we're done */
384         if (record->xl_info & XLR_BKP_BLOCK(0))
385         {
386                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
387                 return;
388         }
389
390         buffer = XLogReadBuffer(data->node, data->blkno, false);
391         if (!BufferIsValid(buffer))
392                 return;
393         page = (Page) BufferGetPage(buffer);
394
395         if (lsn > PageGetLSN(page))
396         {
397                 if (GinPageIsData(page))
398                 {
399                         memcpy(GinDataPageGetData(page),
400                                    XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
401                                    data->nitem * GinSizeOfDataPageItem(page));
402                         GinPageGetOpaque(page)->maxoff = data->nitem;
403                 }
404                 else
405                 {
406                         OffsetNumber i,
407                                            *tod;
408                         IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
409
410                         tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
411                         for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
412                                 tod[i - 1] = i;
413
414                         PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
415
416                         for (i = 0; i < data->nitem; i++)
417                         {
418                                 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
419                                         elog(ERROR, "failed to add item to index page in %u/%u/%u",
420                                                  data->node.spcNode, data->node.dbNode, data->node.relNode);
421                                 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
422                         }
423                 }
424
425                 PageSetLSN(page, lsn);
426                 MarkBufferDirty(buffer);
427         }
428
429         UnlockReleaseBuffer(buffer);
430 }
431
432 static void
433 ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
434 {
435         ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
436         Buffer          dbuffer;
437         Buffer          pbuffer;
438         Buffer          lbuffer;
439         Page            page;
440
441         if (record->xl_info & XLR_BKP_BLOCK(0))
442                 dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
443         else
444         {
445                 dbuffer = XLogReadBuffer(data->node, data->blkno, false);
446                 if (BufferIsValid(dbuffer))
447                 {
448                         page = BufferGetPage(dbuffer);
449                         if (lsn > PageGetLSN(page))
450                         {
451                                 Assert(GinPageIsData(page));
452                                 GinPageGetOpaque(page)->flags = GIN_DELETED;
453                                 PageSetLSN(page, lsn);
454                                 MarkBufferDirty(dbuffer);
455                         }
456                 }
457         }
458
459         if (record->xl_info & XLR_BKP_BLOCK(1))
460                 pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
461         else
462         {
463                 pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
464                 if (BufferIsValid(pbuffer))
465                 {
466                         page = BufferGetPage(pbuffer);
467                         if (lsn > PageGetLSN(page))
468                         {
469                                 Assert(GinPageIsData(page));
470                                 Assert(!GinPageIsLeaf(page));
471                                 GinPageDeletePostingItem(page, data->parentOffset);
472                                 PageSetLSN(page, lsn);
473                                 MarkBufferDirty(pbuffer);
474                         }
475                 }
476         }
477
478         if (record->xl_info & XLR_BKP_BLOCK(2))
479                 (void) RestoreBackupBlock(lsn, record, 2, false, false);
480         else if (data->leftBlkno != InvalidBlockNumber)
481         {
482                 lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
483                 if (BufferIsValid(lbuffer))
484                 {
485                         page = BufferGetPage(lbuffer);
486                         if (lsn > PageGetLSN(page))
487                         {
488                                 Assert(GinPageIsData(page));
489                                 GinPageGetOpaque(page)->rightlink = data->rightLink;
490                                 PageSetLSN(page, lsn);
491                                 MarkBufferDirty(lbuffer);
492                         }
493                         UnlockReleaseBuffer(lbuffer);
494                 }
495         }
496
497         if (BufferIsValid(pbuffer))
498                 UnlockReleaseBuffer(pbuffer);
499         if (BufferIsValid(dbuffer))
500                 UnlockReleaseBuffer(dbuffer);
501 }
502
503 static void
504 ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
505 {
506         ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
507         Buffer          metabuffer;
508         Page            metapage;
509         Buffer          buffer;
510
511         metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
512         if (!BufferIsValid(metabuffer))
513                 return;                                 /* assume index was deleted, nothing to do */
514         metapage = BufferGetPage(metabuffer);
515
516         if (lsn > PageGetLSN(metapage))
517         {
518                 memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
519                 PageSetLSN(metapage, lsn);
520                 MarkBufferDirty(metabuffer);
521         }
522
523         if (data->ntuples > 0)
524         {
525                 /*
526                  * insert into tail page
527                  */
528                 if (record->xl_info & XLR_BKP_BLOCK(0))
529                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
530                 else
531                 {
532                         buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
533                         if (BufferIsValid(buffer))
534                         {
535                                 Page            page = BufferGetPage(buffer);
536
537                                 if (lsn > PageGetLSN(page))
538                                 {
539                                         OffsetNumber l,
540                                                                 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
541                                         OffsetNumberNext(PageGetMaxOffsetNumber(page));
542                                         int                     i,
543                                                                 tupsize;
544                                         IndexTuple      tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
545
546                                         for (i = 0; i < data->ntuples; i++)
547                                         {
548                                                 tupsize = IndexTupleSize(tuples);
549
550                                                 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
551
552                                                 if (l == InvalidOffsetNumber)
553                                                         elog(ERROR, "failed to add item to index page");
554
555                                                 tuples = (IndexTuple) (((char *) tuples) + tupsize);
556
557                                                 off++;
558                                         }
559
560                                         /*
561                                          * Increase counter of heap tuples
562                                          */
563                                         GinPageGetOpaque(page)->maxoff++;
564
565                                         PageSetLSN(page, lsn);
566                                         MarkBufferDirty(buffer);
567                                 }
568                                 UnlockReleaseBuffer(buffer);
569                         }
570                 }
571         }
572         else if (data->prevTail != InvalidBlockNumber)
573         {
574                 /*
575                  * New tail
576                  */
577                 if (record->xl_info & XLR_BKP_BLOCK(0))
578                         (void) RestoreBackupBlock(lsn, record, 0, false, false);
579                 else
580                 {
581                         buffer = XLogReadBuffer(data->node, data->prevTail, false);
582                         if (BufferIsValid(buffer))
583                         {
584                                 Page            page = BufferGetPage(buffer);
585
586                                 if (lsn > PageGetLSN(page))
587                                 {
588                                         GinPageGetOpaque(page)->rightlink = data->newRightlink;
589
590                                         PageSetLSN(page, lsn);
591                                         MarkBufferDirty(buffer);
592                                 }
593                                 UnlockReleaseBuffer(buffer);
594                         }
595                 }
596         }
597
598         UnlockReleaseBuffer(metabuffer);
599 }
600
601 static void
602 ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
603 {
604         ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record);
605         Buffer          buffer;
606         Page            page;
607         OffsetNumber l,
608                                 off = FirstOffsetNumber;
609         int                     i,
610                                 tupsize;
611         IndexTuple      tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
612
613         /* If we have a full-page image, restore it and we're done */
614         if (record->xl_info & XLR_BKP_BLOCK(0))
615         {
616                 (void) RestoreBackupBlock(lsn, record, 0, false, false);
617                 return;
618         }
619
620         buffer = XLogReadBuffer(data->node, data->blkno, true);
621         Assert(BufferIsValid(buffer));
622         page = BufferGetPage(buffer);
623
624         GinInitBuffer(buffer, GIN_LIST);
625         GinPageGetOpaque(page)->rightlink = data->rightlink;
626         if (data->rightlink == InvalidBlockNumber)
627         {
628                 /* tail of sublist */
629                 GinPageSetFullRow(page);
630                 GinPageGetOpaque(page)->maxoff = 1;
631         }
632         else
633         {
634                 GinPageGetOpaque(page)->maxoff = 0;
635         }
636
637         for (i = 0; i < data->ntuples; i++)
638         {
639                 tupsize = IndexTupleSize(tuples);
640
641                 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
642
643                 if (l == InvalidOffsetNumber)
644                         elog(ERROR, "failed to add item to index page");
645
646                 tuples = (IndexTuple) (((char *) tuples) + tupsize);
647         }
648
649         PageSetLSN(page, lsn);
650         MarkBufferDirty(buffer);
651
652         UnlockReleaseBuffer(buffer);
653 }
654
655 static void
656 ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
657 {
658         ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record);
659         Buffer          metabuffer;
660         Page            metapage;
661         int                     i;
662
663         /* Backup blocks are not used in delete_listpage records */
664         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
665
666         metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
667         if (!BufferIsValid(metabuffer))
668                 return;                                 /* assume index was deleted, nothing to do */
669         metapage = BufferGetPage(metabuffer);
670
671         if (lsn > PageGetLSN(metapage))
672         {
673                 memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
674                 PageSetLSN(metapage, lsn);
675                 MarkBufferDirty(metabuffer);
676         }
677
678         /*
679          * In normal operation, shiftList() takes exclusive lock on all the
680          * pages-to-be-deleted simultaneously.  During replay, however, it should
681          * be all right to lock them one at a time.  This is dependent on the fact
682          * that we are deleting pages from the head of the list, and that readers
683          * share-lock the next page before releasing the one they are on. So we
684          * cannot get past a reader that is on, or due to visit, any page we are
685          * going to delete.  New incoming readers will block behind our metapage
686          * lock and then see a fully updated page list.
687          */
688         for (i = 0; i < data->ndeleted; i++)
689         {
690                 Buffer          buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
691
692                 if (BufferIsValid(buffer))
693                 {
694                         Page            page = BufferGetPage(buffer);
695
696                         if (lsn > PageGetLSN(page))
697                         {
698                                 GinPageGetOpaque(page)->flags = GIN_DELETED;
699
700                                 PageSetLSN(page, lsn);
701                                 MarkBufferDirty(buffer);
702                         }
703
704                         UnlockReleaseBuffer(buffer);
705                 }
706         }
707         UnlockReleaseBuffer(metabuffer);
708 }
709
710 void
711 gin_redo(XLogRecPtr lsn, XLogRecord *record)
712 {
713         uint8           info = record->xl_info & ~XLR_INFO_MASK;
714
715         /*
716          * GIN indexes do not require any conflict processing. NB: If we ever
717          * implement a similar optimization as we have in b-tree, and remove
718          * killed tuples outside VACUUM, we'll need to handle that here.
719          */
720
721         topCtx = MemoryContextSwitchTo(opCtx);
722         switch (info)
723         {
724                 case XLOG_GIN_CREATE_INDEX:
725                         ginRedoCreateIndex(lsn, record);
726                         break;
727                 case XLOG_GIN_CREATE_PTREE:
728                         ginRedoCreatePTree(lsn, record);
729                         break;
730                 case XLOG_GIN_INSERT:
731                         ginRedoInsert(lsn, record);
732                         break;
733                 case XLOG_GIN_SPLIT:
734                         ginRedoSplit(lsn, record);
735                         break;
736                 case XLOG_GIN_VACUUM_PAGE:
737                         ginRedoVacuumPage(lsn, record);
738                         break;
739                 case XLOG_GIN_DELETE_PAGE:
740                         ginRedoDeletePage(lsn, record);
741                         break;
742                 case XLOG_GIN_UPDATE_META_PAGE:
743                         ginRedoUpdateMetapage(lsn, record);
744                         break;
745                 case XLOG_GIN_INSERT_LISTPAGE:
746                         ginRedoInsertListPage(lsn, record);
747                         break;
748                 case XLOG_GIN_DELETE_LISTPAGE:
749                         ginRedoDeleteListPages(lsn, record);
750                         break;
751                 default:
752                         elog(PANIC, "gin_redo: unknown op code %u", info);
753         }
754         MemoryContextSwitchTo(topCtx);
755         MemoryContextReset(opCtx);
756 }
757
758 void
759 gin_xlog_startup(void)
760 {
761         incomplete_splits = NIL;
762
763         opCtx = AllocSetContextCreate(CurrentMemoryContext,
764                                                                   "GIN recovery temporary context",
765                                                                   ALLOCSET_DEFAULT_MINSIZE,
766                                                                   ALLOCSET_DEFAULT_INITSIZE,
767                                                                   ALLOCSET_DEFAULT_MAXSIZE);
768 }
769
770 static void
771 ginContinueSplit(ginIncompleteSplit *split)
772 {
773         GinBtreeData btree;
774         GinState        ginstate;
775         Relation        reln;
776         Buffer          buffer;
777         GinBtreeStack stack;
778
779         /*
780          * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u",  split->rootBlkno,
781          * split->leftBlkno, split->rightBlkno);
782          */
783         buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
784
785         /*
786          * Failure should be impossible here, because we wrote the page earlier.
787          */
788         if (!BufferIsValid(buffer))
789                 elog(PANIC, "ginContinueSplit: left block %u not found",
790                          split->leftBlkno);
791
792         reln = CreateFakeRelcacheEntry(split->node);
793
794         if (split->rootBlkno == GIN_ROOT_BLKNO)
795         {
796                 MemSet(&ginstate, 0, sizeof(ginstate));
797                 ginstate.index = reln;
798
799                 ginPrepareEntryScan(&btree,
800                                                         InvalidOffsetNumber, (Datum) 0, GIN_CAT_NULL_KEY,
801                                                         &ginstate);
802         }
803         else
804         {
805                 ginPrepareDataScan(&btree, reln);
806         }
807
808         stack.blkno = split->leftBlkno;
809         stack.buffer = buffer;
810         stack.off = InvalidOffsetNumber;
811         stack.parent = NULL;
812
813         ginFindParents(&btree, &stack, split->rootBlkno);
814
815         btree.prepareDownlink(&btree, buffer);
816         ginInsertValue(&btree, stack.parent, NULL);
817
818         FreeFakeRelcacheEntry(reln);
819
820         UnlockReleaseBuffer(buffer);
821 }
822
823 void
824 gin_xlog_cleanup(void)
825 {
826         ListCell   *l;
827         MemoryContext topCtx;
828
829         topCtx = MemoryContextSwitchTo(opCtx);
830
831         foreach(l, incomplete_splits)
832         {
833                 ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
834
835                 ginContinueSplit(split);
836                 MemoryContextReset(opCtx);
837         }
838
839         MemoryContextSwitchTo(topCtx);
840         MemoryContextDelete(opCtx);
841         incomplete_splits = NIL;
842 }
843
844 bool
845 gin_safe_restartpoint(void)
846 {
847         if (incomplete_splits)
848                 return false;
849         return true;
850 }