1 /*-------------------------------------------------------------------------
4 * WAL replay logic for SP-GiST
7 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/spgist/spgxlog.c
13 *-------------------------------------------------------------------------
17 #include "access/spgist_private.h"
18 #include "access/transam.h"
19 #include "access/xlog.h"
20 #include "access/xlogutils.h"
21 #include "storage/standby.h"
22 #include "utils/memutils.h"
25 static MemoryContext opCtx; /* working memory for operations */
29 * Prepare a dummy SpGistState, with just the minimum info needed for replay.
31 * At present, all we need is enough info to support spgFormDeadTuple(),
32 * plus the isBuild flag.
35 fillFakeState(SpGistState *state, spgxlogState stateSrc)
37 memset(state, 0, sizeof(*state));
39 state->myXid = stateSrc.myXid;
40 state->isBuild = stateSrc.isBuild;
41 state->deadTupleStorage = palloc0(SGDTSIZE);
45 * Add a leaf tuple, or replace an existing placeholder tuple. This is used
46 * to replay SpGistPageAddNewItem() operations. If the offset points at an
47 * existing tuple, it had better be a placeholder tuple.
50 addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
52 if (offset <= PageGetMaxOffsetNumber(page))
54 SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
55 PageGetItemId(page, offset));
57 if (dt->tupstate != SPGIST_PLACEHOLDER)
58 elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
60 Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
61 SpGistPageGetOpaque(page)->nPlaceholder--;
63 PageIndexTupleDelete(page, offset);
66 Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
68 if (PageAddItem(page, tuple, size, offset, false, false) != offset)
69 elog(ERROR, "failed to add item of size %u to SPGiST index page",
74 spgRedoCreateIndex(XLogReaderState *record)
76 XLogRecPtr lsn = record->EndRecPtr;
80 buffer = XLogInitBufferForRedo(record, 0);
81 Assert(BufferGetBlockNumber(buffer) == SPGIST_METAPAGE_BLKNO);
82 page = (Page) BufferGetPage(buffer);
83 SpGistInitMetapage(page);
84 PageSetLSN(page, lsn);
85 MarkBufferDirty(buffer);
86 UnlockReleaseBuffer(buffer);
88 buffer = XLogInitBufferForRedo(record, 1);
89 Assert(BufferGetBlockNumber(buffer) == SPGIST_ROOT_BLKNO);
90 SpGistInitBuffer(buffer, SPGIST_LEAF);
91 page = (Page) BufferGetPage(buffer);
92 PageSetLSN(page, lsn);
93 MarkBufferDirty(buffer);
94 UnlockReleaseBuffer(buffer);
96 buffer = XLogInitBufferForRedo(record, 2);
97 Assert(BufferGetBlockNumber(buffer) == SPGIST_NULL_BLKNO);
98 SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
99 page = (Page) BufferGetPage(buffer);
100 PageSetLSN(page, lsn);
101 MarkBufferDirty(buffer);
102 UnlockReleaseBuffer(buffer);
106 spgRedoAddLeaf(XLogReaderState *record)
108 XLogRecPtr lsn = record->EndRecPtr;
109 char *ptr = XLogRecGetData(record);
110 spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
112 SpGistLeafTupleData leafTupleHdr;
115 XLogRedoAction action;
117 ptr += sizeof(spgxlogAddLeaf);
119 /* the leaf tuple is unaligned, so make a copy to access its header */
120 memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
123 * In normal operation we would have both current and parent pages locked
124 * simultaneously; but in WAL replay it should be safe to update the leaf
125 * page before updating the parent.
129 buffer = XLogInitBufferForRedo(record, 0);
130 SpGistInitBuffer(buffer,
131 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
132 action = BLK_NEEDS_REDO;
135 action = XLogReadBufferForRedo(record, 0, &buffer);
137 if (action == BLK_NEEDS_REDO)
139 page = BufferGetPage(buffer);
141 /* insert new tuple */
142 if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
144 /* normal cases, tuple was added by SpGistPageAddNewItem */
145 addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
148 /* update head tuple's chain link if needed */
149 if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
151 SpGistLeafTuple head;
153 head = (SpGistLeafTuple) PageGetItem(page,
154 PageGetItemId(page, xldata->offnumHeadLeaf));
155 Assert(head->nextOffset == leafTupleHdr.nextOffset);
156 head->nextOffset = xldata->offnumLeaf;
161 /* replacing a DEAD tuple */
162 PageIndexTupleDelete(page, xldata->offnumLeaf);
163 if (PageAddItem(page,
164 (Item) leafTuple, leafTupleHdr.size,
165 xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
166 elog(ERROR, "failed to add item of size %u to SPGiST index page",
170 PageSetLSN(page, lsn);
171 MarkBufferDirty(buffer);
173 if (BufferIsValid(buffer))
174 UnlockReleaseBuffer(buffer);
176 /* update parent downlink if necessary */
177 if (xldata->offnumParent != InvalidOffsetNumber)
179 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
181 SpGistInnerTuple tuple;
182 BlockNumber blknoLeaf;
184 XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
186 page = BufferGetPage(buffer);
188 tuple = (SpGistInnerTuple) PageGetItem(page,
189 PageGetItemId(page, xldata->offnumParent));
191 spgUpdateNodeLink(tuple, xldata->nodeI,
192 blknoLeaf, xldata->offnumLeaf);
194 PageSetLSN(page, lsn);
195 MarkBufferDirty(buffer);
197 if (BufferIsValid(buffer))
198 UnlockReleaseBuffer(buffer);
203 spgRedoMoveLeafs(XLogReaderState *record)
205 XLogRecPtr lsn = record->EndRecPtr;
206 char *ptr = XLogRecGetData(record);
207 spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
209 OffsetNumber *toDelete;
210 OffsetNumber *toInsert;
214 XLogRedoAction action;
215 BlockNumber blknoDst;
217 XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
219 fillFakeState(&state, xldata->stateSrc);
221 nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
223 ptr += SizeOfSpgxlogMoveLeafs;
224 toDelete = (OffsetNumber *) ptr;
225 ptr += sizeof(OffsetNumber) * xldata->nMoves;
226 toInsert = (OffsetNumber *) ptr;
227 ptr += sizeof(OffsetNumber) * nInsert;
229 /* now ptr points to the list of leaf tuples */
232 * In normal operation we would have all three pages (source, dest, and
233 * parent) locked simultaneously; but in WAL replay it should be safe to
234 * update them one at a time, as long as we do it in the right order.
237 /* Insert tuples on the dest page (do first, so redirect is valid) */
240 buffer = XLogInitBufferForRedo(record, 1);
241 SpGistInitBuffer(buffer,
242 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
243 action = BLK_NEEDS_REDO;
246 action = XLogReadBufferForRedo(record, 1, &buffer);
248 if (action == BLK_NEEDS_REDO)
252 page = BufferGetPage(buffer);
254 for (i = 0; i < nInsert; i++)
257 SpGistLeafTupleData leafTupleHdr;
260 * the tuples are not aligned, so must copy to access the size
264 memcpy(&leafTupleHdr, leafTuple,
265 sizeof(SpGistLeafTupleData));
267 addOrReplaceTuple(page, (Item) leafTuple,
268 leafTupleHdr.size, toInsert[i]);
269 ptr += leafTupleHdr.size;
272 PageSetLSN(page, lsn);
273 MarkBufferDirty(buffer);
275 if (BufferIsValid(buffer))
276 UnlockReleaseBuffer(buffer);
278 /* Delete tuples from the source page, inserting a redirection pointer */
279 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
281 page = BufferGetPage(buffer);
283 spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
284 state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
287 toInsert[nInsert - 1]);
289 PageSetLSN(page, lsn);
290 MarkBufferDirty(buffer);
292 if (BufferIsValid(buffer))
293 UnlockReleaseBuffer(buffer);
295 /* And update the parent downlink */
296 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
298 SpGistInnerTuple tuple;
300 page = BufferGetPage(buffer);
302 tuple = (SpGistInnerTuple) PageGetItem(page,
303 PageGetItemId(page, xldata->offnumParent));
305 spgUpdateNodeLink(tuple, xldata->nodeI,
306 blknoDst, toInsert[nInsert - 1]);
308 PageSetLSN(page, lsn);
309 MarkBufferDirty(buffer);
311 if (BufferIsValid(buffer))
312 UnlockReleaseBuffer(buffer);
316 spgRedoAddNode(XLogReaderState *record)
318 XLogRecPtr lsn = record->EndRecPtr;
319 char *ptr = XLogRecGetData(record);
320 spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
322 SpGistInnerTupleData innerTupleHdr;
326 XLogRedoAction action;
328 ptr += sizeof(spgxlogAddNode);
330 /* the tuple is unaligned, so make a copy to access its header */
331 memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
333 fillFakeState(&state, xldata->stateSrc);
335 if (!XLogRecHasBlockRef(record, 1))
337 /* update in place */
338 Assert(xldata->parentBlk == -1);
339 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
341 page = BufferGetPage(buffer);
343 PageIndexTupleDelete(page, xldata->offnum);
344 if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
346 false, false) != xldata->offnum)
347 elog(ERROR, "failed to add item of size %u to SPGiST index page",
350 PageSetLSN(page, lsn);
351 MarkBufferDirty(buffer);
353 if (BufferIsValid(buffer))
354 UnlockReleaseBuffer(buffer);
359 BlockNumber blknoNew;
361 XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
362 XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
365 * In normal operation we would have all three pages (source, dest,
366 * and parent) locked simultaneously; but in WAL replay it should be
367 * safe to update them one at a time, as long as we do it in the right
368 * order. We must insert the new tuple before replacing the old tuple
369 * with the redirect tuple.
372 /* Install new tuple first so redirect is valid */
375 /* AddNode is not used for nulls pages */
376 buffer = XLogInitBufferForRedo(record, 1);
377 SpGistInitBuffer(buffer, 0);
378 action = BLK_NEEDS_REDO;
381 action = XLogReadBufferForRedo(record, 1, &buffer);
382 if (action == BLK_NEEDS_REDO)
384 page = BufferGetPage(buffer);
386 addOrReplaceTuple(page, (Item) innerTuple,
387 innerTupleHdr.size, xldata->offnumNew);
390 * If parent is in this same page, update it now.
392 if (xldata->parentBlk == 1)
394 SpGistInnerTuple parentTuple;
396 parentTuple = (SpGistInnerTuple) PageGetItem(page,
397 PageGetItemId(page, xldata->offnumParent));
399 spgUpdateNodeLink(parentTuple, xldata->nodeI,
400 blknoNew, xldata->offnumNew);
402 PageSetLSN(page, lsn);
403 MarkBufferDirty(buffer);
405 if (BufferIsValid(buffer))
406 UnlockReleaseBuffer(buffer);
408 /* Delete old tuple, replacing it with redirect or placeholder tuple */
409 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
413 page = BufferGetPage(buffer);
416 dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
418 InvalidOffsetNumber);
420 dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
424 PageIndexTupleDelete(page, xldata->offnum);
425 if (PageAddItem(page, (Item) dt, dt->size,
427 false, false) != xldata->offnum)
428 elog(ERROR, "failed to add item of size %u to SPGiST index page",
432 SpGistPageGetOpaque(page)->nPlaceholder++;
434 SpGistPageGetOpaque(page)->nRedirection++;
437 * If parent is in this same page, update it now.
439 if (xldata->parentBlk == 0)
441 SpGistInnerTuple parentTuple;
443 parentTuple = (SpGistInnerTuple) PageGetItem(page,
444 PageGetItemId(page, xldata->offnumParent));
446 spgUpdateNodeLink(parentTuple, xldata->nodeI,
447 blknoNew, xldata->offnumNew);
449 PageSetLSN(page, lsn);
450 MarkBufferDirty(buffer);
452 if (BufferIsValid(buffer))
453 UnlockReleaseBuffer(buffer);
456 * Update parent downlink (if we didn't do it as part of the source or
457 * destination page update already).
459 if (xldata->parentBlk == 2)
461 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
463 SpGistInnerTuple parentTuple;
465 page = BufferGetPage(buffer);
467 parentTuple = (SpGistInnerTuple) PageGetItem(page,
468 PageGetItemId(page, xldata->offnumParent));
470 spgUpdateNodeLink(parentTuple, xldata->nodeI,
471 blknoNew, xldata->offnumNew);
473 PageSetLSN(page, lsn);
474 MarkBufferDirty(buffer);
476 if (BufferIsValid(buffer))
477 UnlockReleaseBuffer(buffer);
483 spgRedoSplitTuple(XLogReaderState *record)
485 XLogRecPtr lsn = record->EndRecPtr;
486 char *ptr = XLogRecGetData(record);
487 spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
489 SpGistInnerTupleData prefixTupleHdr;
491 SpGistInnerTupleData postfixTupleHdr;
494 XLogRedoAction action;
496 ptr += sizeof(spgxlogSplitTuple);
498 /* the prefix tuple is unaligned, so make a copy to access its header */
499 memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
500 ptr += prefixTupleHdr.size;
502 /* postfix tuple is also unaligned */
503 memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
506 * In normal operation we would have both pages locked simultaneously; but
507 * in WAL replay it should be safe to update them one at a time, as long
508 * as we do it in the right order.
511 /* insert postfix tuple first to avoid dangling link */
512 if (!xldata->postfixBlkSame)
516 buffer = XLogInitBufferForRedo(record, 1);
517 /* SplitTuple is not used for nulls pages */
518 SpGistInitBuffer(buffer, 0);
519 action = BLK_NEEDS_REDO;
522 action = XLogReadBufferForRedo(record, 1, &buffer);
523 if (action == BLK_NEEDS_REDO)
525 page = BufferGetPage(buffer);
527 addOrReplaceTuple(page, (Item) postfixTuple,
528 postfixTupleHdr.size, xldata->offnumPostfix);
530 PageSetLSN(page, lsn);
531 MarkBufferDirty(buffer);
533 if (BufferIsValid(buffer))
534 UnlockReleaseBuffer(buffer);
537 /* now handle the original page */
538 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
540 page = BufferGetPage(buffer);
542 PageIndexTupleDelete(page, xldata->offnumPrefix);
543 if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
544 xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
545 elog(ERROR, "failed to add item of size %u to SPGiST index page",
546 prefixTupleHdr.size);
548 if (xldata->postfixBlkSame)
549 addOrReplaceTuple(page, (Item) postfixTuple,
550 postfixTupleHdr.size,
551 xldata->offnumPostfix);
553 PageSetLSN(page, lsn);
554 MarkBufferDirty(buffer);
556 if (BufferIsValid(buffer))
557 UnlockReleaseBuffer(buffer);
561 spgRedoPickSplit(XLogReaderState *record)
563 XLogRecPtr lsn = record->EndRecPtr;
564 char *ptr = XLogRecGetData(record);
565 spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
567 SpGistInnerTupleData innerTupleHdr;
569 OffsetNumber *toDelete;
570 OffsetNumber *toInsert;
571 uint8 *leafPageSelect;
579 BlockNumber blknoInner;
580 XLogRedoAction action;
582 XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
584 fillFakeState(&state, xldata->stateSrc);
586 ptr += SizeOfSpgxlogPickSplit;
587 toDelete = (OffsetNumber *) ptr;
588 ptr += sizeof(OffsetNumber) * xldata->nDelete;
589 toInsert = (OffsetNumber *) ptr;
590 ptr += sizeof(OffsetNumber) * xldata->nInsert;
591 leafPageSelect = (uint8 *) ptr;
592 ptr += sizeof(uint8) * xldata->nInsert;
595 /* the inner tuple is unaligned, so make a copy to access its header */
596 memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
597 ptr += innerTupleHdr.size;
599 /* now ptr points to the list of leaf tuples */
601 if (xldata->isRootSplit)
603 /* when splitting root, we touch it only in the guise of new inner */
604 srcBuffer = InvalidBuffer;
607 else if (xldata->initSrc)
609 /* just re-init the source page */
610 srcBuffer = XLogInitBufferForRedo(record, 0);
611 srcPage = (Page) BufferGetPage(srcBuffer);
613 SpGistInitBuffer(srcBuffer,
614 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
615 /* don't update LSN etc till we're done with it */
620 * Delete the specified tuples from source page. (In case we're in
621 * Hot Standby, we need to hold lock on the page till we're done
622 * inserting leaf tuples and the new inner tuple, else the added
623 * redirect tuple will be a dangling link.)
626 if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
628 srcPage = BufferGetPage(srcBuffer);
631 * We have it a bit easier here than in doPickSplit(), because we
632 * know the inner tuple's location already, so we can inject the
633 * correct redirection tuple now.
636 spgPageIndexMultiDelete(&state, srcPage,
637 toDelete, xldata->nDelete,
641 xldata->offnumInner);
643 spgPageIndexMultiDelete(&state, srcPage,
644 toDelete, xldata->nDelete,
648 InvalidOffsetNumber);
650 /* don't update LSN etc till we're done with it */
654 /* try to access dest page if any */
655 if (!XLogRecHasBlockRef(record, 1))
657 destBuffer = InvalidBuffer;
660 else if (xldata->initDest)
662 /* just re-init the dest page */
663 destBuffer = XLogInitBufferForRedo(record, 1);
664 destPage = (Page) BufferGetPage(destBuffer);
666 SpGistInitBuffer(destBuffer,
667 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
668 /* don't update LSN etc till we're done with it */
673 * We could probably release the page lock immediately in the
674 * full-page-image case, but for safety let's hold it till later.
676 if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
677 destPage = (Page) BufferGetPage(destBuffer);
679 destPage = NULL; /* don't do any page updates */
682 /* restore leaf tuples to src and/or dest page */
683 for (i = 0; i < xldata->nInsert; i++)
686 SpGistLeafTupleData leafTupleHdr;
688 /* the tuples are not aligned, so must copy to access the size field. */
690 memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
691 ptr += leafTupleHdr.size;
693 page = leafPageSelect[i] ? destPage : srcPage;
695 continue; /* no need to touch this page */
697 addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
701 /* Now update src and dest page LSNs if needed */
704 PageSetLSN(srcPage, lsn);
705 MarkBufferDirty(srcBuffer);
707 if (destPage != NULL)
709 PageSetLSN(destPage, lsn);
710 MarkBufferDirty(destBuffer);
713 /* restore new inner tuple */
714 if (xldata->initInner)
716 innerBuffer = XLogInitBufferForRedo(record, 2);
717 SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
718 action = BLK_NEEDS_REDO;
721 action = XLogReadBufferForRedo(record, 2, &innerBuffer);
723 if (action == BLK_NEEDS_REDO)
725 page = BufferGetPage(innerBuffer);
727 addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
728 xldata->offnumInner);
730 /* if inner is also parent, update link while we're here */
731 if (xldata->innerIsParent)
733 SpGistInnerTuple parent;
735 parent = (SpGistInnerTuple) PageGetItem(page,
736 PageGetItemId(page, xldata->offnumParent));
737 spgUpdateNodeLink(parent, xldata->nodeI,
738 blknoInner, xldata->offnumInner);
741 PageSetLSN(page, lsn);
742 MarkBufferDirty(innerBuffer);
744 if (BufferIsValid(innerBuffer))
745 UnlockReleaseBuffer(innerBuffer);
748 * Now we can release the leaf-page locks. It's okay to do this before
749 * updating the parent downlink.
751 if (BufferIsValid(srcBuffer))
752 UnlockReleaseBuffer(srcBuffer);
753 if (BufferIsValid(destBuffer))
754 UnlockReleaseBuffer(destBuffer);
756 /* update parent downlink, unless we did it above */
757 if (XLogRecHasBlockRef(record, 3))
761 if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
763 SpGistInnerTuple parent;
765 page = BufferGetPage(parentBuffer);
767 parent = (SpGistInnerTuple) PageGetItem(page,
768 PageGetItemId(page, xldata->offnumParent));
769 spgUpdateNodeLink(parent, xldata->nodeI,
770 blknoInner, xldata->offnumInner);
772 PageSetLSN(page, lsn);
773 MarkBufferDirty(parentBuffer);
775 if (BufferIsValid(parentBuffer))
776 UnlockReleaseBuffer(parentBuffer);
779 Assert(xldata->innerIsParent || xldata->isRootSplit);
783 spgRedoVacuumLeaf(XLogReaderState *record)
785 XLogRecPtr lsn = record->EndRecPtr;
786 char *ptr = XLogRecGetData(record);
787 spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
788 OffsetNumber *toDead;
789 OffsetNumber *toPlaceholder;
790 OffsetNumber *moveSrc;
791 OffsetNumber *moveDest;
792 OffsetNumber *chainSrc;
793 OffsetNumber *chainDest;
799 fillFakeState(&state, xldata->stateSrc);
801 ptr += SizeOfSpgxlogVacuumLeaf;
802 toDead = (OffsetNumber *) ptr;
803 ptr += sizeof(OffsetNumber) * xldata->nDead;
804 toPlaceholder = (OffsetNumber *) ptr;
805 ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
806 moveSrc = (OffsetNumber *) ptr;
807 ptr += sizeof(OffsetNumber) * xldata->nMove;
808 moveDest = (OffsetNumber *) ptr;
809 ptr += sizeof(OffsetNumber) * xldata->nMove;
810 chainSrc = (OffsetNumber *) ptr;
811 ptr += sizeof(OffsetNumber) * xldata->nChain;
812 chainDest = (OffsetNumber *) ptr;
814 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
816 page = BufferGetPage(buffer);
818 spgPageIndexMultiDelete(&state, page,
819 toDead, xldata->nDead,
820 SPGIST_DEAD, SPGIST_DEAD,
822 InvalidOffsetNumber);
824 spgPageIndexMultiDelete(&state, page,
825 toPlaceholder, xldata->nPlaceholder,
826 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
828 InvalidOffsetNumber);
830 /* see comments in vacuumLeafPage() */
831 for (i = 0; i < xldata->nMove; i++)
833 ItemId idSrc = PageGetItemId(page, moveSrc[i]);
834 ItemId idDest = PageGetItemId(page, moveDest[i]);
842 spgPageIndexMultiDelete(&state, page,
843 moveSrc, xldata->nMove,
844 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
846 InvalidOffsetNumber);
848 for (i = 0; i < xldata->nChain; i++)
852 lt = (SpGistLeafTuple) PageGetItem(page,
853 PageGetItemId(page, chainSrc[i]));
854 Assert(lt->tupstate == SPGIST_LIVE);
855 lt->nextOffset = chainDest[i];
858 PageSetLSN(page, lsn);
859 MarkBufferDirty(buffer);
861 if (BufferIsValid(buffer))
862 UnlockReleaseBuffer(buffer);
866 spgRedoVacuumRoot(XLogReaderState *record)
868 XLogRecPtr lsn = record->EndRecPtr;
869 char *ptr = XLogRecGetData(record);
870 spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
871 OffsetNumber *toDelete;
875 toDelete = xldata->offsets;
877 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
879 page = BufferGetPage(buffer);
881 /* The tuple numbers are in order */
882 PageIndexMultiDelete(page, toDelete, xldata->nDelete);
884 PageSetLSN(page, lsn);
885 MarkBufferDirty(buffer);
887 if (BufferIsValid(buffer))
888 UnlockReleaseBuffer(buffer);
892 spgRedoVacuumRedirect(XLogReaderState *record)
894 XLogRecPtr lsn = record->EndRecPtr;
895 char *ptr = XLogRecGetData(record);
896 spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
897 OffsetNumber *itemToPlaceholder;
900 itemToPlaceholder = xldata->offsets;
903 * If any redirection tuples are being removed, make sure there are no
904 * live Hot Standby transactions that might need to see them.
908 if (TransactionIdIsValid(xldata->newestRedirectXid))
912 XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
913 ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
918 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
920 Page page = BufferGetPage(buffer);
921 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
924 /* Convert redirect pointers to plain placeholders */
925 for (i = 0; i < xldata->nToPlaceholder; i++)
929 dt = (SpGistDeadTuple) PageGetItem(page,
930 PageGetItemId(page, itemToPlaceholder[i]));
931 Assert(dt->tupstate == SPGIST_REDIRECT);
932 dt->tupstate = SPGIST_PLACEHOLDER;
933 ItemPointerSetInvalid(&dt->pointer);
936 Assert(opaque->nRedirection >= xldata->nToPlaceholder);
937 opaque->nRedirection -= xldata->nToPlaceholder;
938 opaque->nPlaceholder += xldata->nToPlaceholder;
940 /* Remove placeholder tuples at end of page */
941 if (xldata->firstPlaceholder != InvalidOffsetNumber)
943 int max = PageGetMaxOffsetNumber(page);
944 OffsetNumber *toDelete;
946 toDelete = palloc(sizeof(OffsetNumber) * max);
948 for (i = xldata->firstPlaceholder; i <= max; i++)
949 toDelete[i - xldata->firstPlaceholder] = i;
951 i = max - xldata->firstPlaceholder + 1;
952 Assert(opaque->nPlaceholder >= i);
953 opaque->nPlaceholder -= i;
955 /* The array is sorted, so can use PageIndexMultiDelete */
956 PageIndexMultiDelete(page, toDelete, i);
961 PageSetLSN(page, lsn);
962 MarkBufferDirty(buffer);
964 if (BufferIsValid(buffer))
965 UnlockReleaseBuffer(buffer);
969 spg_redo(XLogReaderState *record)
971 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
972 MemoryContext oldCxt;
974 oldCxt = MemoryContextSwitchTo(opCtx);
977 case XLOG_SPGIST_CREATE_INDEX:
978 spgRedoCreateIndex(record);
980 case XLOG_SPGIST_ADD_LEAF:
981 spgRedoAddLeaf(record);
983 case XLOG_SPGIST_MOVE_LEAFS:
984 spgRedoMoveLeafs(record);
986 case XLOG_SPGIST_ADD_NODE:
987 spgRedoAddNode(record);
989 case XLOG_SPGIST_SPLIT_TUPLE:
990 spgRedoSplitTuple(record);
992 case XLOG_SPGIST_PICKSPLIT:
993 spgRedoPickSplit(record);
995 case XLOG_SPGIST_VACUUM_LEAF:
996 spgRedoVacuumLeaf(record);
998 case XLOG_SPGIST_VACUUM_ROOT:
999 spgRedoVacuumRoot(record);
1001 case XLOG_SPGIST_VACUUM_REDIRECT:
1002 spgRedoVacuumRedirect(record);
1005 elog(PANIC, "spg_redo: unknown op code %u", info);
1008 MemoryContextSwitchTo(oldCxt);
1009 MemoryContextReset(opCtx);
1013 spg_xlog_startup(void)
1015 opCtx = AllocSetContextCreate(CurrentMemoryContext,
1016 "SP-GiST temporary context",
1017 ALLOCSET_DEFAULT_SIZES);
1021 spg_xlog_cleanup(void)
1023 MemoryContextDelete(opCtx);