]> granicus.if.org Git - postgresql/blob - src/backend/access/gist/gistxlog.c
Update copyright for the year 2010.
[postgresql] / src / backend / access / gist / gistxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * gistxlog.c
4  *        WAL replay logic for GiST.
5  *
6  *
7  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                       $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.35 2010/01/02 16:57:34 momjian Exp $
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include "access/gist_private.h"
17 #include "access/xlogutils.h"
18 #include "miscadmin.h"
19 #include "storage/bufmgr.h"
20 #include "utils/memutils.h"
21 #include "utils/rel.h"
22
23
24 typedef struct
25 {
26         gistxlogPageUpdate *data;
27         int                     len;
28         IndexTuple *itup;
29         OffsetNumber *todelete;
30 } PageUpdateRecord;
31
32 typedef struct
33 {
34         gistxlogPage *header;
35         IndexTuple *itup;
36 } NewPage;
37
38 typedef struct
39 {
40         gistxlogPageSplit *data;
41         NewPage    *page;
42 } PageSplitRecord;
43
44 /* track for incomplete inserts, idea was taken from nbtxlog.c */
45
46 typedef struct gistIncompleteInsert
47 {
48         RelFileNode node;
49         BlockNumber origblkno;          /* for splits */
50         ItemPointerData key;
51         int                     lenblk;
52         BlockNumber *blkno;
53         XLogRecPtr      lsn;
54         BlockNumber *path;
55         int                     pathlen;
56 } gistIncompleteInsert;
57
58
59 static MemoryContext opCtx;             /* working memory for operations */
60 static MemoryContext insertCtx; /* holds incomplete_inserts list */
61 static List *incomplete_inserts;
62
63
64 #define ItemPointerEQ(a, b) \
65         ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
66           ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
67
68
69 static void
70 pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
71                                          BlockNumber *blkno, int lenblk,
72                                          PageSplitRecord *xlinfo /* to extract blkno info */ )
73 {
74         MemoryContext oldCxt;
75         gistIncompleteInsert *ninsert;
76
77         if (!ItemPointerIsValid(&key))
78
79                 /*
80                  * if key is null then we should not store insertion as incomplete,
81                  * because it's a vacuum operation..
82                  */
83                 return;
84
85         oldCxt = MemoryContextSwitchTo(insertCtx);
86         ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
87
88         ninsert->node = node;
89         ninsert->key = key;
90         ninsert->lsn = lsn;
91
92         if (lenblk && blkno)
93         {
94                 ninsert->lenblk = lenblk;
95                 ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
96                 memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
97                 ninsert->origblkno = *blkno;
98         }
99         else
100         {
101                 int                     i;
102
103                 Assert(xlinfo);
104                 ninsert->lenblk = xlinfo->data->npage;
105                 ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
106                 for (i = 0; i < ninsert->lenblk; i++)
107                         ninsert->blkno[i] = xlinfo->page[i].header->blkno;
108                 ninsert->origblkno = xlinfo->data->origblkno;
109         }
110         Assert(ninsert->lenblk > 0);
111
112         /*
113          * Stick the new incomplete insert onto the front of the list, not the
114          * back.  This is so that gist_xlog_cleanup will process incompletions in
115          * last-in-first-out order.
116          */
117         incomplete_inserts = lcons(ninsert, incomplete_inserts);
118
119         MemoryContextSwitchTo(oldCxt);
120 }
121
122 static void
123 forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
124 {
125         ListCell   *l;
126
127         if (!ItemPointerIsValid(&key))
128                 return;
129
130         if (incomplete_inserts == NIL)
131                 return;
132
133         foreach(l, incomplete_inserts)
134         {
135                 gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
136
137                 if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
138                 {
139                         /* found */
140                         incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
141                         pfree(insert->blkno);
142                         pfree(insert);
143                         break;
144                 }
145         }
146 }
147
148 static void
149 decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
150 {
151         char       *begin = XLogRecGetData(record),
152                            *ptr;
153         int                     i = 0,
154                                 addpath = 0;
155
156         decoded->data = (gistxlogPageUpdate *) begin;
157
158         if (decoded->data->ntodelete)
159         {
160                 decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
161                 addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
162         }
163         else
164                 decoded->todelete = NULL;
165
166         decoded->len = 0;
167         ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
168         while (ptr - begin < record->xl_len)
169         {
170                 decoded->len++;
171                 ptr += IndexTupleSize((IndexTuple) ptr);
172         }
173
174         decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
175
176         ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
177         while (ptr - begin < record->xl_len)
178         {
179                 decoded->itup[i] = (IndexTuple) ptr;
180                 ptr += IndexTupleSize(decoded->itup[i]);
181                 i++;
182         }
183 }
184
185 /*
186  * redo any page update (except page split)
187  */
188 static void
189 gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
190 {
191         gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
192         PageUpdateRecord xlrec;
193         Buffer          buffer;
194         Page            page;
195
196         /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
197         forgetIncompleteInsert(xldata->node, xldata->key);
198
199         if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
200                 /* operation with root always finalizes insertion */
201                 pushIncompleteInsert(xldata->node, lsn, xldata->key,
202                                                          &(xldata->blkno), 1,
203                                                          NULL);
204
205         /* nothing else to do if page was backed up (and no info to do it with) */
206         if (record->xl_info & XLR_BKP_BLOCK_1)
207                 return;
208
209         decodePageUpdateRecord(&xlrec, record);
210
211         buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
212         if (!BufferIsValid(buffer))
213                 return;
214         page = (Page) BufferGetPage(buffer);
215
216         if (XLByteLE(lsn, PageGetLSN(page)))
217         {
218                 UnlockReleaseBuffer(buffer);
219                 return;
220         }
221
222         if (isnewroot)
223                 GISTInitBuffer(buffer, 0);
224         else if (xlrec.data->ntodelete)
225         {
226                 int                     i;
227
228                 for (i = 0; i < xlrec.data->ntodelete; i++)
229                         PageIndexTupleDelete(page, xlrec.todelete[i]);
230                 if (GistPageIsLeaf(page))
231                         GistMarkTuplesDeleted(page);
232         }
233
234         /* add tuples */
235         if (xlrec.len > 0)
236                 gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
237
238         /*
239          * special case: leafpage, nothing to insert, nothing to delete, then
240          * vacuum marks page
241          */
242         if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
243                 GistClearTuplesDeleted(page);
244
245         if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
246
247                 /*
248                  * all links on non-leaf root page was deleted by vacuum full, so root
249                  * page becomes a leaf
250                  */
251                 GistPageSetLeaf(page);
252
253         GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
254         PageSetLSN(page, lsn);
255         PageSetTLI(page, ThisTimeLineID);
256         MarkBufferDirty(buffer);
257         UnlockReleaseBuffer(buffer);
258 }
259
260 static void
261 gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
262 {
263         gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
264         Buffer          buffer;
265         Page            page;
266
267         /* nothing else to do if page was backed up (and no info to do it with) */
268         if (record->xl_info & XLR_BKP_BLOCK_1)
269                 return;
270
271         buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
272         if (!BufferIsValid(buffer))
273                 return;
274
275         page = (Page) BufferGetPage(buffer);
276         GistPageSetDeleted(page);
277
278         PageSetLSN(page, lsn);
279         PageSetTLI(page, ThisTimeLineID);
280         MarkBufferDirty(buffer);
281         UnlockReleaseBuffer(buffer);
282 }
283
284 static void
285 decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
286 {
287         char       *begin = XLogRecGetData(record),
288                            *ptr;
289         int                     j,
290                                 i = 0;
291
292         decoded->data = (gistxlogPageSplit *) begin;
293         decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
294
295         ptr = begin + sizeof(gistxlogPageSplit);
296         for (i = 0; i < decoded->data->npage; i++)
297         {
298                 Assert(ptr - begin < record->xl_len);
299                 decoded->page[i].header = (gistxlogPage *) ptr;
300                 ptr += sizeof(gistxlogPage);
301
302                 decoded->page[i].itup = (IndexTuple *)
303                         palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
304                 j = 0;
305                 while (j < decoded->page[i].header->num)
306                 {
307                         Assert(ptr - begin < record->xl_len);
308                         decoded->page[i].itup[j] = (IndexTuple) ptr;
309                         ptr += IndexTupleSize((IndexTuple) ptr);
310                         j++;
311                 }
312         }
313 }
314
315 static void
316 gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
317 {
318         PageSplitRecord xlrec;
319         Buffer          buffer;
320         Page            page;
321         int                     i;
322         int                     flags;
323
324         decodePageSplitRecord(&xlrec, record);
325         flags = xlrec.data->origleaf ? F_LEAF : 0;
326
327         /* loop around all pages */
328         for (i = 0; i < xlrec.data->npage; i++)
329         {
330                 NewPage    *newpage = xlrec.page + i;
331
332                 buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
333                 Assert(BufferIsValid(buffer));
334                 page = (Page) BufferGetPage(buffer);
335
336                 /* ok, clear buffer */
337                 GISTInitBuffer(buffer, flags);
338
339                 /* and fill it */
340                 gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
341
342                 PageSetLSN(page, lsn);
343                 PageSetTLI(page, ThisTimeLineID);
344                 MarkBufferDirty(buffer);
345                 UnlockReleaseBuffer(buffer);
346         }
347
348         forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
349
350         pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
351                                                  NULL, 0,
352                                                  &xlrec);
353 }
354
355 static void
356 gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
357 {
358         RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
359         Buffer          buffer;
360         Page            page;
361
362         buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
363         Assert(BufferIsValid(buffer));
364         page = (Page) BufferGetPage(buffer);
365
366         GISTInitBuffer(buffer, F_LEAF);
367
368         PageSetLSN(page, lsn);
369         PageSetTLI(page, ThisTimeLineID);
370
371         MarkBufferDirty(buffer);
372         UnlockReleaseBuffer(buffer);
373 }
374
375 static void
376 gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
377 {
378         char       *begin = XLogRecGetData(record),
379                            *ptr;
380         gistxlogInsertComplete *xlrec;
381
382         xlrec = (gistxlogInsertComplete *) begin;
383
384         ptr = begin + sizeof(gistxlogInsertComplete);
385         while (ptr - begin < record->xl_len)
386         {
387                 Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
388                 forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
389                 ptr += sizeof(ItemPointerData);
390         }
391 }
392
393 void
394 gist_redo(XLogRecPtr lsn, XLogRecord *record)
395 {
396         uint8           info = record->xl_info & ~XLR_INFO_MASK;
397         MemoryContext oldCxt;
398
399         /*
400          * GIST indexes do not require any conflict processing. NB: If we ever
401          * implement a similar optimization we have in b-tree, and remove killed
402          * tuples outside VACUUM, we'll need to handle that here.
403          */
404
405         RestoreBkpBlocks(lsn, record, false);
406
407         oldCxt = MemoryContextSwitchTo(opCtx);
408         switch (info)
409         {
410                 case XLOG_GIST_PAGE_UPDATE:
411                         gistRedoPageUpdateRecord(lsn, record, false);
412                         break;
413                 case XLOG_GIST_PAGE_DELETE:
414                         gistRedoPageDeleteRecord(lsn, record);
415                         break;
416                 case XLOG_GIST_NEW_ROOT:
417                         gistRedoPageUpdateRecord(lsn, record, true);
418                         break;
419                 case XLOG_GIST_PAGE_SPLIT:
420                         gistRedoPageSplitRecord(lsn, record);
421                         break;
422                 case XLOG_GIST_CREATE_INDEX:
423                         gistRedoCreateIndex(lsn, record);
424                         break;
425                 case XLOG_GIST_INSERT_COMPLETE:
426                         gistRedoCompleteInsert(lsn, record);
427                         break;
428                 default:
429                         elog(PANIC, "gist_redo: unknown op code %u", info);
430         }
431
432         MemoryContextSwitchTo(oldCxt);
433         MemoryContextReset(opCtx);
434 }
435
436 static void
437 out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
438 {
439         appendStringInfo(buf, "rel %u/%u/%u",
440                                          node.spcNode, node.dbNode, node.relNode);
441         if (ItemPointerIsValid(&key))
442                 appendStringInfo(buf, "; tid %u/%u",
443                                                  ItemPointerGetBlockNumber(&key),
444                                                  ItemPointerGetOffsetNumber(&key));
445 }
446
447 static void
448 out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
449 {
450         out_target(buf, xlrec->node, xlrec->key);
451         appendStringInfo(buf, "; block number %u", xlrec->blkno);
452 }
453
454 static void
455 out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
456 {
457         appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
458                                 xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
459                                          xlrec->blkno);
460 }
461
462 static void
463 out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
464 {
465         appendStringInfo(buf, "page_split: ");
466         out_target(buf, xlrec->node, xlrec->key);
467         appendStringInfo(buf, "; block number %u splits to %d pages",
468                                          xlrec->origblkno, xlrec->npage);
469 }
470
471 void
472 gist_desc(StringInfo buf, uint8 xl_info, char *rec)
473 {
474         uint8           info = xl_info & ~XLR_INFO_MASK;
475
476         switch (info)
477         {
478                 case XLOG_GIST_PAGE_UPDATE:
479                         appendStringInfo(buf, "page_update: ");
480                         out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
481                         break;
482                 case XLOG_GIST_PAGE_DELETE:
483                         out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
484                         break;
485                 case XLOG_GIST_NEW_ROOT:
486                         appendStringInfo(buf, "new_root: ");
487                         out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
488                         break;
489                 case XLOG_GIST_PAGE_SPLIT:
490                         out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
491                         break;
492                 case XLOG_GIST_CREATE_INDEX:
493                         appendStringInfo(buf, "create_index: rel %u/%u/%u",
494                                                          ((RelFileNode *) rec)->spcNode,
495                                                          ((RelFileNode *) rec)->dbNode,
496                                                          ((RelFileNode *) rec)->relNode);
497                         break;
498                 case XLOG_GIST_INSERT_COMPLETE:
499                         appendStringInfo(buf, "complete_insert: rel %u/%u/%u",
500                                                          ((gistxlogInsertComplete *) rec)->node.spcNode,
501                                                          ((gistxlogInsertComplete *) rec)->node.dbNode,
502                                                          ((gistxlogInsertComplete *) rec)->node.relNode);
503                         break;
504                 default:
505                         appendStringInfo(buf, "unknown gist op code %u", info);
506                         break;
507         }
508 }
509
510 IndexTuple
511 gist_form_invalid_tuple(BlockNumber blkno)
512 {
513         /*
514          * we don't alloc space for null's bitmap, this is invalid tuple, be
515          * carefull in read and write code
516          */
517         Size            size = IndexInfoFindDataOffset(0);
518         IndexTuple      tuple = (IndexTuple) palloc0(size);
519
520         tuple->t_info |= size;
521
522         ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
523         GistTupleSetInvalid(tuple);
524
525         return tuple;
526 }
527
528
529 static void
530 gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
531 {
532         GISTInsertStack *top;
533
534         insert->pathlen = 0;
535         insert->path = NULL;
536
537         if ((top = gistFindPath(index, insert->origblkno)) != NULL)
538         {
539                 int                     i;
540                 GISTInsertStack *ptr;
541
542                 for (ptr = top; ptr; ptr = ptr->parent)
543                         insert->pathlen++;
544
545                 insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
546
547                 i = 0;
548                 for (ptr = top; ptr; ptr = ptr->parent)
549                         insert->path[i++] = ptr->blkno;
550         }
551         else
552                 elog(ERROR, "lost parent for block %u", insert->origblkno);
553 }
554
555 static SplitedPageLayout *
556 gistMakePageLayout(Buffer *buffers, int nbuffers)
557 {
558         SplitedPageLayout *res = NULL,
559                            *resptr;
560
561         while (nbuffers-- > 0)
562         {
563                 Page            page = BufferGetPage(buffers[nbuffers]);
564                 IndexTuple *vec;
565                 int                     veclen;
566
567                 resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout));
568
569                 resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]);
570                 resptr->block.num = PageGetMaxOffsetNumber(page);
571
572                 vec = gistextractpage(page, &veclen);
573                 resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist));
574
575                 resptr->next = res;
576                 res = resptr;
577         }
578
579         return res;
580 }
581
582 /*
583  * Continue insert after crash.  In normal situations, there aren't any
584  * incomplete inserts, but if a crash occurs partway through an insertion
585  * sequence, we'll need to finish making the index valid at the end of WAL
586  * replay.
587  *
588  * Note that we assume the index is now in a valid state, except for the
589  * unfinished insertion.  In particular it's safe to invoke gistFindPath();
590  * there shouldn't be any garbage pages for it to run into.
591  *
592  * To complete insert we can't use basic insertion algorithm because
593  * during insertion we can't call user-defined support functions of opclass.
594  * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
595  * 'invalid' tuple should be updated by vacuum full.
596  */
597 static void
598 gistContinueInsert(gistIncompleteInsert *insert)
599 {
600         IndexTuple *itup;
601         int                     i,
602                                 lenitup;
603         Relation        index;
604
605         index = CreateFakeRelcacheEntry(insert->node);
606
607         /*
608          * needed vector itup never will be more than initial lenblkno+2, because
609          * during this processing Indextuple can be only smaller
610          */
611         lenitup = insert->lenblk;
612         itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));
613
614         for (i = 0; i < insert->lenblk; i++)
615                 itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
616
617         /*
618          * any insertion of itup[] should make LOG message about
619          */
620
621         if (insert->origblkno == GIST_ROOT_BLKNO)
622         {
623                 /*
624                  * it was split root, so we should only make new root. it can't be
625                  * simple insert into root, we should replace all content of root.
626                  */
627                 Buffer          buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);
628
629                 gistnewroot(index, buffer, itup, lenitup, NULL);
630                 UnlockReleaseBuffer(buffer);
631         }
632         else
633         {
634                 Buffer     *buffers;
635                 Page       *pages;
636                 int                     numbuffer;
637                 OffsetNumber *todelete;
638
639                 /* construct path */
640                 gistxlogFindPath(index, insert);
641
642                 Assert(insert->pathlen > 0);
643
644                 buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
645                 pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
646                 todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
647
648                 for (i = 0; i < insert->pathlen; i++)
649                 {
650                         int                     j,
651                                                 k,
652                                                 pituplen = 0;
653                         uint8           xlinfo;
654                         XLogRecData *rdata;
655                         XLogRecPtr      recptr;
656                         Buffer          tempbuffer = InvalidBuffer;
657                         int                     ntodelete = 0;
658
659                         numbuffer = 1;
660                         buffers[0] = ReadBuffer(index, insert->path[i]);
661                         LockBuffer(buffers[0], GIST_EXCLUSIVE);
662
663                         /*
664                          * we check buffer, because we restored page earlier
665                          */
666                         gistcheckpage(index, buffers[0]);
667
668                         pages[0] = BufferGetPage(buffers[0]);
669                         Assert(!GistPageIsLeaf(pages[0]));
670
671                         pituplen = PageGetMaxOffsetNumber(pages[0]);
672
673                         /* find remove old IndexTuples to remove */
674                         for (j = 0; j < pituplen && ntodelete < lenitup; j++)
675                         {
676                                 BlockNumber blkno;
677                                 ItemId          iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
678                                 IndexTuple      idxtup = (IndexTuple) PageGetItem(pages[0], iid);
679
680                                 blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
681
682                                 for (k = 0; k < lenitup; k++)
683                                         if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
684                                         {
685                                                 todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
686                                                 ntodelete++;
687                                                 break;
688                                         }
689                         }
690
691                         if (ntodelete == 0)
692                                 elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");
693
694                         /*
695                          * we check space with subtraction only first tuple to delete,
696                          * hope, that wiil be enough space....
697                          */
698
699                         if (gistnospace(pages[0], itup, lenitup, *todelete, 0))
700                         {
701
702                                 /* no space left on page, so we must split */
703                                 buffers[numbuffer] = ReadBuffer(index, P_NEW);
704                                 LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
705                                 GISTInitBuffer(buffers[numbuffer], 0);
706                                 pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
707                                 gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber);
708                                 numbuffer++;
709
710                                 if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
711                                 {
712                                         Buffer          tmp;
713
714                                         /*
715                                          * we split root, just copy content from root to new page
716                                          */
717
718                                         /* sanity check */
719                                         if (i + 1 != insert->pathlen)
720                                                 elog(PANIC, "unexpected pathlen in index \"%s\"",
721                                                          RelationGetRelationName(index));
722
723                                         /* fill new page, root will be changed later */
724                                         tempbuffer = ReadBuffer(index, P_NEW);
725                                         LockBuffer(tempbuffer, GIST_EXCLUSIVE);
726                                         memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer));
727
728                                         /* swap buffers[0] (was root) and temp buffer */
729                                         tmp = buffers[0];
730                                         buffers[0] = tempbuffer;
731                                         tempbuffer = tmp;       /* now in tempbuffer GIST_ROOT_BLKNO,
732                                                                                  * it is still unchanged */
733
734                                         pages[0] = BufferGetPage(buffers[0]);
735                                 }
736
737                                 START_CRIT_SECTION();
738
739                                 for (j = 0; j < ntodelete; j++)
740                                         PageIndexTupleDelete(pages[0], todelete[j]);
741
742                                 xlinfo = XLOG_GIST_PAGE_SPLIT;
743                                 rdata = formSplitRdata(index->rd_node, insert->path[i],
744                                                                            false, &(insert->key),
745                                                                          gistMakePageLayout(buffers, numbuffer));
746
747                         }
748                         else
749                         {
750                                 START_CRIT_SECTION();
751
752                                 for (j = 0; j < ntodelete; j++)
753                                         PageIndexTupleDelete(pages[0], todelete[j]);
754                                 gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber);
755
756                                 xlinfo = XLOG_GIST_PAGE_UPDATE;
757                                 rdata = formUpdateRdata(index->rd_node, buffers[0],
758                                                                                 todelete, ntodelete,
759                                                                                 itup, lenitup, &(insert->key));
760                         }
761
762                         /*
763                          * use insert->key as mark for completion of insert (form*Rdata()
764                          * above) for following possible replays
765                          */
766
767                         /* write pages, we should mark it dirty befor XLogInsert() */
768                         for (j = 0; j < numbuffer; j++)
769                         {
770                                 GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
771                                 MarkBufferDirty(buffers[j]);
772                         }
773                         recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata);
774                         for (j = 0; j < numbuffer; j++)
775                         {
776                                 PageSetLSN(pages[j], recptr);
777                                 PageSetTLI(pages[j], ThisTimeLineID);
778                         }
779
780                         END_CRIT_SECTION();
781
782                         lenitup = numbuffer;
783                         for (j = 0; j < numbuffer; j++)
784                         {
785                                 itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
786                                 UnlockReleaseBuffer(buffers[j]);
787                         }
788
789                         if (tempbuffer != InvalidBuffer)
790                         {
791                                 /*
792                                  * it was a root split, so fill it by new values
793                                  */
794                                 gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
795                                 UnlockReleaseBuffer(tempbuffer);
796                         }
797                 }
798         }
799
800         FreeFakeRelcacheEntry(index);
801
802         ereport(LOG,
803                         (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
804                         insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
805                    errdetail("Incomplete insertion detected during crash replay.")));
806 }
807
808 void
809 gist_xlog_startup(void)
810 {
811         incomplete_inserts = NIL;
812         insertCtx = AllocSetContextCreate(CurrentMemoryContext,
813                                                                           "GiST recovery temporary context",
814                                                                           ALLOCSET_DEFAULT_MINSIZE,
815                                                                           ALLOCSET_DEFAULT_INITSIZE,
816                                                                           ALLOCSET_DEFAULT_MAXSIZE);
817         opCtx = createTempGistContext();
818 }
819
820 void
821 gist_xlog_cleanup(void)
822 {
823         ListCell   *l;
824         MemoryContext oldCxt;
825
826         oldCxt = MemoryContextSwitchTo(opCtx);
827
828         foreach(l, incomplete_inserts)
829         {
830                 gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
831
832                 gistContinueInsert(insert);
833                 MemoryContextReset(opCtx);
834         }
835         MemoryContextSwitchTo(oldCxt);
836
837         MemoryContextDelete(opCtx);
838         MemoryContextDelete(insertCtx);
839 }
840
841 bool
842 gist_safe_restartpoint(void)
843 {
844         if (incomplete_inserts)
845                 return false;
846         return true;
847 }
848
849
850 XLogRecData *
851 formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
852                            ItemPointer key, SplitedPageLayout *dist)
853 {
854         XLogRecData *rdata;
855         gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
856         SplitedPageLayout *ptr;
857         int                     npage = 0,
858                                 cur = 1;
859
860         ptr = dist;
861         while (ptr)
862         {
863                 npage++;
864                 ptr = ptr->next;
865         }
866
867         rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
868
869         xlrec->node = node;
870         xlrec->origblkno = blkno;
871         xlrec->origleaf = page_is_leaf;
872         xlrec->npage = (uint16) npage;
873         if (key)
874                 xlrec->key = *key;
875         else
876                 ItemPointerSetInvalid(&(xlrec->key));
877
878         rdata[0].buffer = InvalidBuffer;
879         rdata[0].data = (char *) xlrec;
880         rdata[0].len = sizeof(gistxlogPageSplit);
881         rdata[0].next = NULL;
882
883         ptr = dist;
884         while (ptr)
885         {
886                 rdata[cur].buffer = InvalidBuffer;
887                 rdata[cur].data = (char *) &(ptr->block);
888                 rdata[cur].len = sizeof(gistxlogPage);
889                 rdata[cur - 1].next = &(rdata[cur]);
890                 cur++;
891
892                 rdata[cur].buffer = InvalidBuffer;
893                 rdata[cur].data = (char *) (ptr->list);
894                 rdata[cur].len = ptr->lenlist;
895                 rdata[cur - 1].next = &(rdata[cur]);
896                 rdata[cur].next = NULL;
897                 cur++;
898                 ptr = ptr->next;
899         }
900
901         return rdata;
902 }
903
904 /*
905  * Construct the rdata array for an XLOG record describing a page update
906  * (deletion and/or insertion of tuples on a single index page).
907  *
908  * Note that both the todelete array and the tuples are marked as belonging
909  * to the target buffer; they need not be stored in XLOG if XLogInsert decides
910  * to log the whole buffer contents instead.  Also, we take care that there's
911  * at least one rdata item referencing the buffer, even when ntodelete and
912  * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
913  */
914 XLogRecData *
915 formUpdateRdata(RelFileNode node, Buffer buffer,
916                                 OffsetNumber *todelete, int ntodelete,
917                                 IndexTuple *itup, int ituplen, ItemPointer key)
918 {
919         XLogRecData *rdata;
920         gistxlogPageUpdate *xlrec;
921         int                     cur,
922                                 i;
923
924         rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
925         xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
926
927         xlrec->node = node;
928         xlrec->blkno = BufferGetBlockNumber(buffer);
929         xlrec->ntodelete = ntodelete;
930
931         if (key)
932                 xlrec->key = *key;
933         else
934                 ItemPointerSetInvalid(&(xlrec->key));
935
936         rdata[0].buffer = buffer;
937         rdata[0].buffer_std = true;
938         rdata[0].data = NULL;
939         rdata[0].len = 0;
940         rdata[0].next = &(rdata[1]);
941
942         rdata[1].data = (char *) xlrec;
943         rdata[1].len = sizeof(gistxlogPageUpdate);
944         rdata[1].buffer = InvalidBuffer;
945         rdata[1].next = &(rdata[2]);
946
947         rdata[2].data = (char *) todelete;
948         rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
949         rdata[2].buffer = buffer;
950         rdata[2].buffer_std = true;
951         rdata[2].next = NULL;
952
953         /* new tuples */
954         cur = 3;
955         for (i = 0; i < ituplen; i++)
956         {
957                 rdata[cur - 1].next = &(rdata[cur]);
958                 rdata[cur].data = (char *) (itup[i]);
959                 rdata[cur].len = IndexTupleSize(itup[i]);
960                 rdata[cur].buffer = buffer;
961                 rdata[cur].buffer_std = true;
962                 rdata[cur].next = NULL;
963                 cur++;
964         }
965
966         return rdata;
967 }
968
969 XLogRecPtr
970 gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len)
971 {
972         gistxlogInsertComplete xlrec;
973         XLogRecData rdata[2];
974         XLogRecPtr      recptr;
975
976         Assert(len > 0);
977         xlrec.node = node;
978
979         rdata[0].buffer = InvalidBuffer;
980         rdata[0].data = (char *) &xlrec;
981         rdata[0].len = sizeof(gistxlogInsertComplete);
982         rdata[0].next = &(rdata[1]);
983
984         rdata[1].buffer = InvalidBuffer;
985         rdata[1].data = (char *) keys;
986         rdata[1].len = sizeof(ItemPointerData) * len;
987         rdata[1].next = NULL;
988
989         START_CRIT_SECTION();
990
991         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
992
993         END_CRIT_SECTION();
994
995         return recptr;
996 }