]> granicus.if.org Git - postgresql/blob - src/backend/access/spgist/spgxlog.c
Add macros to make AllocSetContextCreate() calls simpler and safer.
[postgresql] / src / backend / access / spgist / spgxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * spgxlog.c
4  *        WAL replay logic for SP-GiST
5  *
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                       src/backend/access/spgist/spgxlog.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/spgist_private.h"
18 #include "access/transam.h"
19 #include "access/xlog.h"
20 #include "access/xlogutils.h"
21 #include "storage/standby.h"
22 #include "utils/memutils.h"
23
24
25 static MemoryContext opCtx;             /* working memory for operations */
26
27
28 /*
29  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
30  *
31  * At present, all we need is enough info to support spgFormDeadTuple(),
32  * plus the isBuild flag.
33  */
34 static void
35 fillFakeState(SpGistState *state, spgxlogState stateSrc)
36 {
37         memset(state, 0, sizeof(*state));
38
39         state->myXid = stateSrc.myXid;
40         state->isBuild = stateSrc.isBuild;
41         state->deadTupleStorage = palloc0(SGDTSIZE);
42 }
43
44 /*
45  * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
46  * to replay SpGistPageAddNewItem() operations.  If the offset points at an
47  * existing tuple, it had better be a placeholder tuple.
48  */
49 static void
50 addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
51 {
52         if (offset <= PageGetMaxOffsetNumber(page))
53         {
54                 SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
55                                                                                                 PageGetItemId(page, offset));
56
57                 if (dt->tupstate != SPGIST_PLACEHOLDER)
58                         elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
59
60                 Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
61                 SpGistPageGetOpaque(page)->nPlaceholder--;
62
63                 PageIndexTupleDelete(page, offset);
64         }
65
66         Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
67
68         if (PageAddItem(page, tuple, size, offset, false, false) != offset)
69                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
70                          size);
71 }
72
73 static void
74 spgRedoCreateIndex(XLogReaderState *record)
75 {
76         XLogRecPtr      lsn = record->EndRecPtr;
77         Buffer          buffer;
78         Page            page;
79
80         buffer = XLogInitBufferForRedo(record, 0);
81         Assert(BufferGetBlockNumber(buffer) == SPGIST_METAPAGE_BLKNO);
82         page = (Page) BufferGetPage(buffer);
83         SpGistInitMetapage(page);
84         PageSetLSN(page, lsn);
85         MarkBufferDirty(buffer);
86         UnlockReleaseBuffer(buffer);
87
88         buffer = XLogInitBufferForRedo(record, 1);
89         Assert(BufferGetBlockNumber(buffer) == SPGIST_ROOT_BLKNO);
90         SpGistInitBuffer(buffer, SPGIST_LEAF);
91         page = (Page) BufferGetPage(buffer);
92         PageSetLSN(page, lsn);
93         MarkBufferDirty(buffer);
94         UnlockReleaseBuffer(buffer);
95
96         buffer = XLogInitBufferForRedo(record, 2);
97         Assert(BufferGetBlockNumber(buffer) == SPGIST_NULL_BLKNO);
98         SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
99         page = (Page) BufferGetPage(buffer);
100         PageSetLSN(page, lsn);
101         MarkBufferDirty(buffer);
102         UnlockReleaseBuffer(buffer);
103 }
104
105 static void
106 spgRedoAddLeaf(XLogReaderState *record)
107 {
108         XLogRecPtr      lsn = record->EndRecPtr;
109         char       *ptr = XLogRecGetData(record);
110         spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
111         char       *leafTuple;
112         SpGistLeafTupleData leafTupleHdr;
113         Buffer          buffer;
114         Page            page;
115         XLogRedoAction action;
116
117         ptr += sizeof(spgxlogAddLeaf);
118         leafTuple = ptr;
119         /* the leaf tuple is unaligned, so make a copy to access its header */
120         memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
121
122         /*
123          * In normal operation we would have both current and parent pages locked
124          * simultaneously; but in WAL replay it should be safe to update the leaf
125          * page before updating the parent.
126          */
127         if (xldata->newPage)
128         {
129                 buffer = XLogInitBufferForRedo(record, 0);
130                 SpGistInitBuffer(buffer,
131                                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
132                 action = BLK_NEEDS_REDO;
133         }
134         else
135                 action = XLogReadBufferForRedo(record, 0, &buffer);
136
137         if (action == BLK_NEEDS_REDO)
138         {
139                 page = BufferGetPage(buffer);
140
141                 /* insert new tuple */
142                 if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
143                 {
144                         /* normal cases, tuple was added by SpGistPageAddNewItem */
145                         addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
146                                                           xldata->offnumLeaf);
147
148                         /* update head tuple's chain link if needed */
149                         if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
150                         {
151                                 SpGistLeafTuple head;
152
153                                 head = (SpGistLeafTuple) PageGetItem(page,
154                                                                 PageGetItemId(page, xldata->offnumHeadLeaf));
155                                 Assert(head->nextOffset == leafTupleHdr.nextOffset);
156                                 head->nextOffset = xldata->offnumLeaf;
157                         }
158                 }
159                 else
160                 {
161                         /* replacing a DEAD tuple */
162                         PageIndexTupleDelete(page, xldata->offnumLeaf);
163                         if (PageAddItem(page,
164                                                         (Item) leafTuple, leafTupleHdr.size,
165                                          xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
166                                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
167                                          leafTupleHdr.size);
168                 }
169
170                 PageSetLSN(page, lsn);
171                 MarkBufferDirty(buffer);
172         }
173         if (BufferIsValid(buffer))
174                 UnlockReleaseBuffer(buffer);
175
176         /* update parent downlink if necessary */
177         if (xldata->offnumParent != InvalidOffsetNumber)
178         {
179                 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
180                 {
181                         SpGistInnerTuple tuple;
182                         BlockNumber blknoLeaf;
183
184                         XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
185
186                         page = BufferGetPage(buffer);
187
188                         tuple = (SpGistInnerTuple) PageGetItem(page,
189                                                                   PageGetItemId(page, xldata->offnumParent));
190
191                         spgUpdateNodeLink(tuple, xldata->nodeI,
192                                                           blknoLeaf, xldata->offnumLeaf);
193
194                         PageSetLSN(page, lsn);
195                         MarkBufferDirty(buffer);
196                 }
197                 if (BufferIsValid(buffer))
198                         UnlockReleaseBuffer(buffer);
199         }
200 }
201
202 static void
203 spgRedoMoveLeafs(XLogReaderState *record)
204 {
205         XLogRecPtr      lsn = record->EndRecPtr;
206         char       *ptr = XLogRecGetData(record);
207         spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
208         SpGistState state;
209         OffsetNumber *toDelete;
210         OffsetNumber *toInsert;
211         int                     nInsert;
212         Buffer          buffer;
213         Page            page;
214         XLogRedoAction action;
215         BlockNumber blknoDst;
216
217         XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
218
219         fillFakeState(&state, xldata->stateSrc);
220
221         nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
222
223         ptr += SizeOfSpgxlogMoveLeafs;
224         toDelete = (OffsetNumber *) ptr;
225         ptr += sizeof(OffsetNumber) * xldata->nMoves;
226         toInsert = (OffsetNumber *) ptr;
227         ptr += sizeof(OffsetNumber) * nInsert;
228
229         /* now ptr points to the list of leaf tuples */
230
231         /*
232          * In normal operation we would have all three pages (source, dest, and
233          * parent) locked simultaneously; but in WAL replay it should be safe to
234          * update them one at a time, as long as we do it in the right order.
235          */
236
237         /* Insert tuples on the dest page (do first, so redirect is valid) */
238         if (xldata->newPage)
239         {
240                 buffer = XLogInitBufferForRedo(record, 1);
241                 SpGistInitBuffer(buffer,
242                                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
243                 action = BLK_NEEDS_REDO;
244         }
245         else
246                 action = XLogReadBufferForRedo(record, 1, &buffer);
247
248         if (action == BLK_NEEDS_REDO)
249         {
250                 int                     i;
251
252                 page = BufferGetPage(buffer);
253
254                 for (i = 0; i < nInsert; i++)
255                 {
256                         char       *leafTuple;
257                         SpGistLeafTupleData leafTupleHdr;
258
259                         /*
260                          * the tuples are not aligned, so must copy to access the size
261                          * field.
262                          */
263                         leafTuple = ptr;
264                         memcpy(&leafTupleHdr, leafTuple,
265                                    sizeof(SpGistLeafTupleData));
266
267                         addOrReplaceTuple(page, (Item) leafTuple,
268                                                           leafTupleHdr.size, toInsert[i]);
269                         ptr += leafTupleHdr.size;
270                 }
271
272                 PageSetLSN(page, lsn);
273                 MarkBufferDirty(buffer);
274         }
275         if (BufferIsValid(buffer))
276                 UnlockReleaseBuffer(buffer);
277
278         /* Delete tuples from the source page, inserting a redirection pointer */
279         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
280         {
281                 page = BufferGetPage(buffer);
282
283                 spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
284                                                 state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
285                                                                 SPGIST_PLACEHOLDER,
286                                                                 blknoDst,
287                                                                 toInsert[nInsert - 1]);
288
289                 PageSetLSN(page, lsn);
290                 MarkBufferDirty(buffer);
291         }
292         if (BufferIsValid(buffer))
293                 UnlockReleaseBuffer(buffer);
294
295         /* And update the parent downlink */
296         if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
297         {
298                 SpGistInnerTuple tuple;
299
300                 page = BufferGetPage(buffer);
301
302                 tuple = (SpGistInnerTuple) PageGetItem(page,
303                                                                   PageGetItemId(page, xldata->offnumParent));
304
305                 spgUpdateNodeLink(tuple, xldata->nodeI,
306                                                   blknoDst, toInsert[nInsert - 1]);
307
308                 PageSetLSN(page, lsn);
309                 MarkBufferDirty(buffer);
310         }
311         if (BufferIsValid(buffer))
312                 UnlockReleaseBuffer(buffer);
313 }
314
315 static void
316 spgRedoAddNode(XLogReaderState *record)
317 {
318         XLogRecPtr      lsn = record->EndRecPtr;
319         char       *ptr = XLogRecGetData(record);
320         spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
321         char       *innerTuple;
322         SpGistInnerTupleData innerTupleHdr;
323         SpGistState state;
324         Buffer          buffer;
325         Page            page;
326         XLogRedoAction action;
327
328         ptr += sizeof(spgxlogAddNode);
329         innerTuple = ptr;
330         /* the tuple is unaligned, so make a copy to access its header */
331         memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
332
333         fillFakeState(&state, xldata->stateSrc);
334
335         if (!XLogRecHasBlockRef(record, 1))
336         {
337                 /* update in place */
338                 Assert(xldata->parentBlk == -1);
339                 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
340                 {
341                         page = BufferGetPage(buffer);
342
343                         PageIndexTupleDelete(page, xldata->offnum);
344                         if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
345                                                         xldata->offnum,
346                                                         false, false) != xldata->offnum)
347                                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
348                                          innerTupleHdr.size);
349
350                         PageSetLSN(page, lsn);
351                         MarkBufferDirty(buffer);
352                 }
353                 if (BufferIsValid(buffer))
354                         UnlockReleaseBuffer(buffer);
355         }
356         else
357         {
358                 BlockNumber blkno;
359                 BlockNumber blknoNew;
360
361                 XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
362                 XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
363
364                 /*
365                  * In normal operation we would have all three pages (source, dest,
366                  * and parent) locked simultaneously; but in WAL replay it should be
367                  * safe to update them one at a time, as long as we do it in the right
368                  * order. We must insert the new tuple before replacing the old tuple
369                  * with the redirect tuple.
370                  */
371
372                 /* Install new tuple first so redirect is valid */
373                 if (xldata->newPage)
374                 {
375                         /* AddNode is not used for nulls pages */
376                         buffer = XLogInitBufferForRedo(record, 1);
377                         SpGistInitBuffer(buffer, 0);
378                         action = BLK_NEEDS_REDO;
379                 }
380                 else
381                         action = XLogReadBufferForRedo(record, 1, &buffer);
382                 if (action == BLK_NEEDS_REDO)
383                 {
384                         page = BufferGetPage(buffer);
385
386                         addOrReplaceTuple(page, (Item) innerTuple,
387                                                           innerTupleHdr.size, xldata->offnumNew);
388
389                         /*
390                          * If parent is in this same page, update it now.
391                          */
392                         if (xldata->parentBlk == 1)
393                         {
394                                 SpGistInnerTuple parentTuple;
395
396                                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
397                                                                   PageGetItemId(page, xldata->offnumParent));
398
399                                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
400                                                                   blknoNew, xldata->offnumNew);
401                         }
402                         PageSetLSN(page, lsn);
403                         MarkBufferDirty(buffer);
404                 }
405                 if (BufferIsValid(buffer))
406                         UnlockReleaseBuffer(buffer);
407
408                 /* Delete old tuple, replacing it with redirect or placeholder tuple */
409                 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
410                 {
411                         SpGistDeadTuple dt;
412
413                         page = BufferGetPage(buffer);
414
415                         if (state.isBuild)
416                                 dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
417                                                                           InvalidBlockNumber,
418                                                                           InvalidOffsetNumber);
419                         else
420                                 dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
421                                                                           blknoNew,
422                                                                           xldata->offnumNew);
423
424                         PageIndexTupleDelete(page, xldata->offnum);
425                         if (PageAddItem(page, (Item) dt, dt->size,
426                                                         xldata->offnum,
427                                                         false, false) != xldata->offnum)
428                                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
429                                          dt->size);
430
431                         if (state.isBuild)
432                                 SpGistPageGetOpaque(page)->nPlaceholder++;
433                         else
434                                 SpGistPageGetOpaque(page)->nRedirection++;
435
436                         /*
437                          * If parent is in this same page, update it now.
438                          */
439                         if (xldata->parentBlk == 0)
440                         {
441                                 SpGistInnerTuple parentTuple;
442
443                                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
444                                                                   PageGetItemId(page, xldata->offnumParent));
445
446                                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
447                                                                   blknoNew, xldata->offnumNew);
448                         }
449                         PageSetLSN(page, lsn);
450                         MarkBufferDirty(buffer);
451                 }
452                 if (BufferIsValid(buffer))
453                         UnlockReleaseBuffer(buffer);
454
455                 /*
456                  * Update parent downlink (if we didn't do it as part of the source or
457                  * destination page update already).
458                  */
459                 if (xldata->parentBlk == 2)
460                 {
461                         if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
462                         {
463                                 SpGistInnerTuple parentTuple;
464
465                                 page = BufferGetPage(buffer);
466
467                                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
468                                                                   PageGetItemId(page, xldata->offnumParent));
469
470                                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
471                                                                   blknoNew, xldata->offnumNew);
472
473                                 PageSetLSN(page, lsn);
474                                 MarkBufferDirty(buffer);
475                         }
476                         if (BufferIsValid(buffer))
477                                 UnlockReleaseBuffer(buffer);
478                 }
479         }
480 }
481
482 static void
483 spgRedoSplitTuple(XLogReaderState *record)
484 {
485         XLogRecPtr      lsn = record->EndRecPtr;
486         char       *ptr = XLogRecGetData(record);
487         spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
488         char       *prefixTuple;
489         SpGistInnerTupleData prefixTupleHdr;
490         char       *postfixTuple;
491         SpGistInnerTupleData postfixTupleHdr;
492         Buffer          buffer;
493         Page            page;
494         XLogRedoAction action;
495
496         ptr += sizeof(spgxlogSplitTuple);
497         prefixTuple = ptr;
498         /* the prefix tuple is unaligned, so make a copy to access its header */
499         memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
500         ptr += prefixTupleHdr.size;
501         postfixTuple = ptr;
502         /* postfix tuple is also unaligned */
503         memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
504
505         /*
506          * In normal operation we would have both pages locked simultaneously; but
507          * in WAL replay it should be safe to update them one at a time, as long
508          * as we do it in the right order.
509          */
510
511         /* insert postfix tuple first to avoid dangling link */
512         if (!xldata->postfixBlkSame)
513         {
514                 if (xldata->newPage)
515                 {
516                         buffer = XLogInitBufferForRedo(record, 1);
517                         /* SplitTuple is not used for nulls pages */
518                         SpGistInitBuffer(buffer, 0);
519                         action = BLK_NEEDS_REDO;
520                 }
521                 else
522                         action = XLogReadBufferForRedo(record, 1, &buffer);
523                 if (action == BLK_NEEDS_REDO)
524                 {
525                         page = BufferGetPage(buffer);
526
527                         addOrReplaceTuple(page, (Item) postfixTuple,
528                                                           postfixTupleHdr.size, xldata->offnumPostfix);
529
530                         PageSetLSN(page, lsn);
531                         MarkBufferDirty(buffer);
532                 }
533                 if (BufferIsValid(buffer))
534                         UnlockReleaseBuffer(buffer);
535         }
536
537         /* now handle the original page */
538         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
539         {
540                 page = BufferGetPage(buffer);
541
542                 PageIndexTupleDelete(page, xldata->offnumPrefix);
543                 if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
544                                  xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
545                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
546                                  prefixTupleHdr.size);
547
548                 if (xldata->postfixBlkSame)
549                         addOrReplaceTuple(page, (Item) postfixTuple,
550                                                           postfixTupleHdr.size,
551                                                           xldata->offnumPostfix);
552
553                 PageSetLSN(page, lsn);
554                 MarkBufferDirty(buffer);
555         }
556         if (BufferIsValid(buffer))
557                 UnlockReleaseBuffer(buffer);
558 }
559
560 static void
561 spgRedoPickSplit(XLogReaderState *record)
562 {
563         XLogRecPtr      lsn = record->EndRecPtr;
564         char       *ptr = XLogRecGetData(record);
565         spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
566         char       *innerTuple;
567         SpGistInnerTupleData innerTupleHdr;
568         SpGistState state;
569         OffsetNumber *toDelete;
570         OffsetNumber *toInsert;
571         uint8      *leafPageSelect;
572         Buffer          srcBuffer;
573         Buffer          destBuffer;
574         Buffer          innerBuffer;
575         Page            srcPage;
576         Page            destPage;
577         Page            page;
578         int                     i;
579         BlockNumber blknoInner;
580         XLogRedoAction action;
581
582         XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
583
584         fillFakeState(&state, xldata->stateSrc);
585
586         ptr += SizeOfSpgxlogPickSplit;
587         toDelete = (OffsetNumber *) ptr;
588         ptr += sizeof(OffsetNumber) * xldata->nDelete;
589         toInsert = (OffsetNumber *) ptr;
590         ptr += sizeof(OffsetNumber) * xldata->nInsert;
591         leafPageSelect = (uint8 *) ptr;
592         ptr += sizeof(uint8) * xldata->nInsert;
593
594         innerTuple = ptr;
595         /* the inner tuple is unaligned, so make a copy to access its header */
596         memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
597         ptr += innerTupleHdr.size;
598
599         /* now ptr points to the list of leaf tuples */
600
601         if (xldata->isRootSplit)
602         {
603                 /* when splitting root, we touch it only in the guise of new inner */
604                 srcBuffer = InvalidBuffer;
605                 srcPage = NULL;
606         }
607         else if (xldata->initSrc)
608         {
609                 /* just re-init the source page */
610                 srcBuffer = XLogInitBufferForRedo(record, 0);
611                 srcPage = (Page) BufferGetPage(srcBuffer);
612
613                 SpGistInitBuffer(srcBuffer,
614                                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
615                 /* don't update LSN etc till we're done with it */
616         }
617         else
618         {
619                 /*
620                  * Delete the specified tuples from source page.  (In case we're in
621                  * Hot Standby, we need to hold lock on the page till we're done
622                  * inserting leaf tuples and the new inner tuple, else the added
623                  * redirect tuple will be a dangling link.)
624                  */
625                 srcPage = NULL;
626                 if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
627                 {
628                         srcPage = BufferGetPage(srcBuffer);
629
630                         /*
631                          * We have it a bit easier here than in doPickSplit(), because we
632                          * know the inner tuple's location already, so we can inject the
633                          * correct redirection tuple now.
634                          */
635                         if (!state.isBuild)
636                                 spgPageIndexMultiDelete(&state, srcPage,
637                                                                                 toDelete, xldata->nDelete,
638                                                                                 SPGIST_REDIRECT,
639                                                                                 SPGIST_PLACEHOLDER,
640                                                                                 blknoInner,
641                                                                                 xldata->offnumInner);
642                         else
643                                 spgPageIndexMultiDelete(&state, srcPage,
644                                                                                 toDelete, xldata->nDelete,
645                                                                                 SPGIST_PLACEHOLDER,
646                                                                                 SPGIST_PLACEHOLDER,
647                                                                                 InvalidBlockNumber,
648                                                                                 InvalidOffsetNumber);
649
650                         /* don't update LSN etc till we're done with it */
651                 }
652         }
653
654         /* try to access dest page if any */
655         if (!XLogRecHasBlockRef(record, 1))
656         {
657                 destBuffer = InvalidBuffer;
658                 destPage = NULL;
659         }
660         else if (xldata->initDest)
661         {
662                 /* just re-init the dest page */
663                 destBuffer = XLogInitBufferForRedo(record, 1);
664                 destPage = (Page) BufferGetPage(destBuffer);
665
666                 SpGistInitBuffer(destBuffer,
667                                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
668                 /* don't update LSN etc till we're done with it */
669         }
670         else
671         {
672                 /*
673                  * We could probably release the page lock immediately in the
674                  * full-page-image case, but for safety let's hold it till later.
675                  */
676                 if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
677                         destPage = (Page) BufferGetPage(destBuffer);
678                 else
679                         destPage = NULL;        /* don't do any page updates */
680         }
681
682         /* restore leaf tuples to src and/or dest page */
683         for (i = 0; i < xldata->nInsert; i++)
684         {
685                 char       *leafTuple;
686                 SpGistLeafTupleData leafTupleHdr;
687
688                 /* the tuples are not aligned, so must copy to access the size field. */
689                 leafTuple = ptr;
690                 memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
691                 ptr += leafTupleHdr.size;
692
693                 page = leafPageSelect[i] ? destPage : srcPage;
694                 if (page == NULL)
695                         continue;                       /* no need to touch this page */
696
697                 addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
698                                                   toInsert[i]);
699         }
700
701         /* Now update src and dest page LSNs if needed */
702         if (srcPage != NULL)
703         {
704                 PageSetLSN(srcPage, lsn);
705                 MarkBufferDirty(srcBuffer);
706         }
707         if (destPage != NULL)
708         {
709                 PageSetLSN(destPage, lsn);
710                 MarkBufferDirty(destBuffer);
711         }
712
713         /* restore new inner tuple */
714         if (xldata->initInner)
715         {
716                 innerBuffer = XLogInitBufferForRedo(record, 2);
717                 SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
718                 action = BLK_NEEDS_REDO;
719         }
720         else
721                 action = XLogReadBufferForRedo(record, 2, &innerBuffer);
722
723         if (action == BLK_NEEDS_REDO)
724         {
725                 page = BufferGetPage(innerBuffer);
726
727                 addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
728                                                   xldata->offnumInner);
729
730                 /* if inner is also parent, update link while we're here */
731                 if (xldata->innerIsParent)
732                 {
733                         SpGistInnerTuple parent;
734
735                         parent = (SpGistInnerTuple) PageGetItem(page,
736                                                                   PageGetItemId(page, xldata->offnumParent));
737                         spgUpdateNodeLink(parent, xldata->nodeI,
738                                                           blknoInner, xldata->offnumInner);
739                 }
740
741                 PageSetLSN(page, lsn);
742                 MarkBufferDirty(innerBuffer);
743         }
744         if (BufferIsValid(innerBuffer))
745                 UnlockReleaseBuffer(innerBuffer);
746
747         /*
748          * Now we can release the leaf-page locks.  It's okay to do this before
749          * updating the parent downlink.
750          */
751         if (BufferIsValid(srcBuffer))
752                 UnlockReleaseBuffer(srcBuffer);
753         if (BufferIsValid(destBuffer))
754                 UnlockReleaseBuffer(destBuffer);
755
756         /* update parent downlink, unless we did it above */
757         if (XLogRecHasBlockRef(record, 3))
758         {
759                 Buffer          parentBuffer;
760
761                 if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
762                 {
763                         SpGistInnerTuple parent;
764
765                         page = BufferGetPage(parentBuffer);
766
767                         parent = (SpGistInnerTuple) PageGetItem(page,
768                                                                   PageGetItemId(page, xldata->offnumParent));
769                         spgUpdateNodeLink(parent, xldata->nodeI,
770                                                           blknoInner, xldata->offnumInner);
771
772                         PageSetLSN(page, lsn);
773                         MarkBufferDirty(parentBuffer);
774                 }
775                 if (BufferIsValid(parentBuffer))
776                         UnlockReleaseBuffer(parentBuffer);
777         }
778         else
779                 Assert(xldata->innerIsParent || xldata->isRootSplit);
780 }
781
782 static void
783 spgRedoVacuumLeaf(XLogReaderState *record)
784 {
785         XLogRecPtr      lsn = record->EndRecPtr;
786         char       *ptr = XLogRecGetData(record);
787         spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
788         OffsetNumber *toDead;
789         OffsetNumber *toPlaceholder;
790         OffsetNumber *moveSrc;
791         OffsetNumber *moveDest;
792         OffsetNumber *chainSrc;
793         OffsetNumber *chainDest;
794         SpGistState state;
795         Buffer          buffer;
796         Page            page;
797         int                     i;
798
799         fillFakeState(&state, xldata->stateSrc);
800
801         ptr += SizeOfSpgxlogVacuumLeaf;
802         toDead = (OffsetNumber *) ptr;
803         ptr += sizeof(OffsetNumber) * xldata->nDead;
804         toPlaceholder = (OffsetNumber *) ptr;
805         ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
806         moveSrc = (OffsetNumber *) ptr;
807         ptr += sizeof(OffsetNumber) * xldata->nMove;
808         moveDest = (OffsetNumber *) ptr;
809         ptr += sizeof(OffsetNumber) * xldata->nMove;
810         chainSrc = (OffsetNumber *) ptr;
811         ptr += sizeof(OffsetNumber) * xldata->nChain;
812         chainDest = (OffsetNumber *) ptr;
813
814         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
815         {
816                 page = BufferGetPage(buffer);
817
818                 spgPageIndexMultiDelete(&state, page,
819                                                                 toDead, xldata->nDead,
820                                                                 SPGIST_DEAD, SPGIST_DEAD,
821                                                                 InvalidBlockNumber,
822                                                                 InvalidOffsetNumber);
823
824                 spgPageIndexMultiDelete(&state, page,
825                                                                 toPlaceholder, xldata->nPlaceholder,
826                                                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
827                                                                 InvalidBlockNumber,
828                                                                 InvalidOffsetNumber);
829
830                 /* see comments in vacuumLeafPage() */
831                 for (i = 0; i < xldata->nMove; i++)
832                 {
833                         ItemId          idSrc = PageGetItemId(page, moveSrc[i]);
834                         ItemId          idDest = PageGetItemId(page, moveDest[i]);
835                         ItemIdData      tmp;
836
837                         tmp = *idSrc;
838                         *idSrc = *idDest;
839                         *idDest = tmp;
840                 }
841
842                 spgPageIndexMultiDelete(&state, page,
843                                                                 moveSrc, xldata->nMove,
844                                                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
845                                                                 InvalidBlockNumber,
846                                                                 InvalidOffsetNumber);
847
848                 for (i = 0; i < xldata->nChain; i++)
849                 {
850                         SpGistLeafTuple lt;
851
852                         lt = (SpGistLeafTuple) PageGetItem(page,
853                                                                                    PageGetItemId(page, chainSrc[i]));
854                         Assert(lt->tupstate == SPGIST_LIVE);
855                         lt->nextOffset = chainDest[i];
856                 }
857
858                 PageSetLSN(page, lsn);
859                 MarkBufferDirty(buffer);
860         }
861         if (BufferIsValid(buffer))
862                 UnlockReleaseBuffer(buffer);
863 }
864
865 static void
866 spgRedoVacuumRoot(XLogReaderState *record)
867 {
868         XLogRecPtr      lsn = record->EndRecPtr;
869         char       *ptr = XLogRecGetData(record);
870         spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
871         OffsetNumber *toDelete;
872         Buffer          buffer;
873         Page            page;
874
875         toDelete = xldata->offsets;
876
877         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
878         {
879                 page = BufferGetPage(buffer);
880
881                 /* The tuple numbers are in order */
882                 PageIndexMultiDelete(page, toDelete, xldata->nDelete);
883
884                 PageSetLSN(page, lsn);
885                 MarkBufferDirty(buffer);
886         }
887         if (BufferIsValid(buffer))
888                 UnlockReleaseBuffer(buffer);
889 }
890
891 static void
892 spgRedoVacuumRedirect(XLogReaderState *record)
893 {
894         XLogRecPtr      lsn = record->EndRecPtr;
895         char       *ptr = XLogRecGetData(record);
896         spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
897         OffsetNumber *itemToPlaceholder;
898         Buffer          buffer;
899
900         itemToPlaceholder = xldata->offsets;
901
902         /*
903          * If any redirection tuples are being removed, make sure there are no
904          * live Hot Standby transactions that might need to see them.
905          */
906         if (InHotStandby)
907         {
908                 if (TransactionIdIsValid(xldata->newestRedirectXid))
909                 {
910                         RelFileNode node;
911
912                         XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
913                         ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
914                                                                                                 node);
915                 }
916         }
917
918         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
919         {
920                 Page            page = BufferGetPage(buffer);
921                 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
922                 int                     i;
923
924                 /* Convert redirect pointers to plain placeholders */
925                 for (i = 0; i < xldata->nToPlaceholder; i++)
926                 {
927                         SpGistDeadTuple dt;
928
929                         dt = (SpGistDeadTuple) PageGetItem(page,
930                                                                   PageGetItemId(page, itemToPlaceholder[i]));
931                         Assert(dt->tupstate == SPGIST_REDIRECT);
932                         dt->tupstate = SPGIST_PLACEHOLDER;
933                         ItemPointerSetInvalid(&dt->pointer);
934                 }
935
936                 Assert(opaque->nRedirection >= xldata->nToPlaceholder);
937                 opaque->nRedirection -= xldata->nToPlaceholder;
938                 opaque->nPlaceholder += xldata->nToPlaceholder;
939
940                 /* Remove placeholder tuples at end of page */
941                 if (xldata->firstPlaceholder != InvalidOffsetNumber)
942                 {
943                         int                     max = PageGetMaxOffsetNumber(page);
944                         OffsetNumber *toDelete;
945
946                         toDelete = palloc(sizeof(OffsetNumber) * max);
947
948                         for (i = xldata->firstPlaceholder; i <= max; i++)
949                                 toDelete[i - xldata->firstPlaceholder] = i;
950
951                         i = max - xldata->firstPlaceholder + 1;
952                         Assert(opaque->nPlaceholder >= i);
953                         opaque->nPlaceholder -= i;
954
955                         /* The array is sorted, so can use PageIndexMultiDelete */
956                         PageIndexMultiDelete(page, toDelete, i);
957
958                         pfree(toDelete);
959                 }
960
961                 PageSetLSN(page, lsn);
962                 MarkBufferDirty(buffer);
963         }
964         if (BufferIsValid(buffer))
965                 UnlockReleaseBuffer(buffer);
966 }
967
968 void
969 spg_redo(XLogReaderState *record)
970 {
971         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
972         MemoryContext oldCxt;
973
974         oldCxt = MemoryContextSwitchTo(opCtx);
975         switch (info)
976         {
977                 case XLOG_SPGIST_CREATE_INDEX:
978                         spgRedoCreateIndex(record);
979                         break;
980                 case XLOG_SPGIST_ADD_LEAF:
981                         spgRedoAddLeaf(record);
982                         break;
983                 case XLOG_SPGIST_MOVE_LEAFS:
984                         spgRedoMoveLeafs(record);
985                         break;
986                 case XLOG_SPGIST_ADD_NODE:
987                         spgRedoAddNode(record);
988                         break;
989                 case XLOG_SPGIST_SPLIT_TUPLE:
990                         spgRedoSplitTuple(record);
991                         break;
992                 case XLOG_SPGIST_PICKSPLIT:
993                         spgRedoPickSplit(record);
994                         break;
995                 case XLOG_SPGIST_VACUUM_LEAF:
996                         spgRedoVacuumLeaf(record);
997                         break;
998                 case XLOG_SPGIST_VACUUM_ROOT:
999                         spgRedoVacuumRoot(record);
1000                         break;
1001                 case XLOG_SPGIST_VACUUM_REDIRECT:
1002                         spgRedoVacuumRedirect(record);
1003                         break;
1004                 default:
1005                         elog(PANIC, "spg_redo: unknown op code %u", info);
1006         }
1007
1008         MemoryContextSwitchTo(oldCxt);
1009         MemoryContextReset(opCtx);
1010 }
1011
1012 void
1013 spg_xlog_startup(void)
1014 {
1015         opCtx = AllocSetContextCreate(CurrentMemoryContext,
1016                                                                   "SP-GiST temporary context",
1017                                                                   ALLOCSET_DEFAULT_SIZES);
1018 }
1019
1020 void
1021 spg_xlog_cleanup(void)
1022 {
1023         MemoryContextDelete(opCtx);
1024         opCtx = NULL;
1025 }