1 /*-------------------------------------------------------------------------
4 * WAL replay logic for GiST.
7 * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/gist/gistxlog.c
12 *-------------------------------------------------------------------------
16 #include "access/gist_private.h"
17 #include "access/xlogutils.h"
18 #include "utils/memutils.h"
28 gistxlogPageSplit *data;
32 static MemoryContext opCtx; /* working memory for operations */
35 * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
37 * Even if the WAL record includes a full-page image, we have to update the
38 * follow-right flag, because that change is not included in the full-page
39 * image. To be sure that the intermediate state with the wrong flag value is
40 * not visible to concurrent Hot Standby queries, this function handles
41 * restoring the full-page image as well as updating the flag. (Note that
42 * we never need to do anything else to the child page in the current WAL
46 gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
47 RelFileNode node, BlockNumber childblkno)
52 if (record->xl_info & XLR_BKP_BLOCK(block_index))
53 buffer = RestoreBackupBlock(lsn, record, block_index, false, true);
56 buffer = XLogReadBuffer(node, childblkno, false);
57 if (!BufferIsValid(buffer))
58 return; /* page was deleted, nothing to do */
60 page = (Page) BufferGetPage(buffer);
63 * Note that we still update the page even if page LSN is equal to the LSN
64 * of this record, because the updated NSN is not included in the full
67 if (lsn >= PageGetLSN(page))
69 GistPageSetNSN(page, lsn);
70 GistClearFollowRight(page);
72 PageSetLSN(page, lsn);
73 MarkBufferDirty(buffer);
75 UnlockReleaseBuffer(buffer);
79 * redo any page update (except page split)
82 gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
84 char *begin = XLogRecGetData(record);
85 gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
91 * We need to acquire and hold lock on target page while updating the left
92 * child page. If we have a full-page image of target page, getting the
93 * lock is a side-effect of restoring that image. Note that even if the
94 * target page no longer exists, we'll still attempt to replay the change
97 if (record->xl_info & XLR_BKP_BLOCK(0))
98 buffer = RestoreBackupBlock(lsn, record, 0, false, true);
100 buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
102 /* Fix follow-right data on left child page */
103 if (BlockNumberIsValid(xldata->leftchild))
104 gistRedoClearFollowRight(lsn, record, 1,
105 xldata->node, xldata->leftchild);
107 /* Done if target page no longer exists */
108 if (!BufferIsValid(buffer))
111 /* nothing more to do if page was backed up (and no info to do it with) */
112 if (record->xl_info & XLR_BKP_BLOCK(0))
114 UnlockReleaseBuffer(buffer);
118 page = (Page) BufferGetPage(buffer);
120 /* nothing more to do if change already applied */
121 if (lsn <= PageGetLSN(page))
123 UnlockReleaseBuffer(buffer);
127 data = begin + sizeof(gistxlogPageUpdate);
129 /* Delete old tuples */
130 if (xldata->ntodelete > 0)
133 OffsetNumber *todelete = (OffsetNumber *) data;
135 data += sizeof(OffsetNumber) * xldata->ntodelete;
137 for (i = 0; i < xldata->ntodelete; i++)
138 PageIndexTupleDelete(page, todelete[i]);
139 if (GistPageIsLeaf(page))
140 GistMarkTuplesDeleted(page);
144 if (data - begin < record->xl_len)
146 OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
147 OffsetNumberNext(PageGetMaxOffsetNumber(page));
149 while (data - begin < record->xl_len)
151 IndexTuple itup = (IndexTuple) data;
152 Size sz = IndexTupleSize(itup);
157 l = PageAddItem(page, (Item) itup, sz, off, false, false);
158 if (l == InvalidOffsetNumber)
159 elog(ERROR, "failed to add item to GiST index page, size %d bytes",
167 * special case: leafpage, nothing to insert, nothing to delete, then
170 if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
171 GistClearTuplesDeleted(page);
174 if (!GistPageIsLeaf(page) &&
175 PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
176 xldata->blkno == GIST_ROOT_BLKNO)
179 * all links on non-leaf root page was deleted by vacuum full, so root
180 * page becomes a leaf
182 GistPageSetLeaf(page);
185 PageSetLSN(page, lsn);
186 MarkBufferDirty(buffer);
187 UnlockReleaseBuffer(buffer);
191 decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
193 char *begin = XLogRecGetData(record),
198 decoded->data = (gistxlogPageSplit *) begin;
199 decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
201 ptr = begin + sizeof(gistxlogPageSplit);
202 for (i = 0; i < decoded->data->npage; i++)
204 Assert(ptr - begin < record->xl_len);
205 decoded->page[i].header = (gistxlogPage *) ptr;
206 ptr += sizeof(gistxlogPage);
208 decoded->page[i].itup = (IndexTuple *)
209 palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
211 while (j < decoded->page[i].header->num)
213 Assert(ptr - begin < record->xl_len);
214 decoded->page[i].itup[j] = (IndexTuple) ptr;
215 ptr += IndexTupleSize((IndexTuple) ptr);
222 gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
224 gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
225 PageSplitRecord xlrec;
226 Buffer firstbuffer = InvalidBuffer;
230 bool isrootsplit = false;
232 decodePageSplitRecord(&xlrec, record);
235 * We must hold lock on the first-listed page throughout the action,
236 * including while updating the left child page (if any). We can unlock
237 * remaining pages in the list as soon as they've been written, because
238 * there is no path for concurrent queries to reach those pages without
239 * first visiting the first-listed page.
242 /* loop around all pages */
243 for (i = 0; i < xlrec.data->npage; i++)
245 NewPage *newpage = xlrec.page + i;
248 if (newpage->header->blkno == GIST_ROOT_BLKNO)
254 buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
255 Assert(BufferIsValid(buffer));
256 page = (Page) BufferGetPage(buffer);
258 /* ok, clear buffer */
259 if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
263 GISTInitBuffer(buffer, flags);
266 gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
268 if (newpage->header->blkno == GIST_ROOT_BLKNO)
270 GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
271 GistPageSetNSN(page, xldata->orignsn);
272 GistClearFollowRight(page);
276 if (i < xlrec.data->npage - 1)
277 GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
279 GistPageGetOpaque(page)->rightlink = xldata->origrlink;
280 GistPageSetNSN(page, xldata->orignsn);
281 if (i < xlrec.data->npage - 1 && !isrootsplit &&
282 xldata->markfollowright)
283 GistMarkFollowRight(page);
285 GistClearFollowRight(page);
288 PageSetLSN(page, lsn);
289 MarkBufferDirty(buffer);
292 firstbuffer = buffer;
294 UnlockReleaseBuffer(buffer);
297 /* Fix follow-right data on left child page, if any */
298 if (BlockNumberIsValid(xldata->leftchild))
299 gistRedoClearFollowRight(lsn, record, 0,
300 xldata->node, xldata->leftchild);
302 /* Finally, release lock on the first page */
303 UnlockReleaseBuffer(firstbuffer);
307 gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
309 RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
313 /* Backup blocks are not used in create_index records */
314 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
316 buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
317 Assert(BufferIsValid(buffer));
318 page = (Page) BufferGetPage(buffer);
320 GISTInitBuffer(buffer, F_LEAF);
322 PageSetLSN(page, lsn);
324 MarkBufferDirty(buffer);
325 UnlockReleaseBuffer(buffer);
329 gist_redo(XLogRecPtr lsn, XLogRecord *record)
331 uint8 info = record->xl_info & ~XLR_INFO_MASK;
332 MemoryContext oldCxt;
335 * GiST indexes do not require any conflict processing. NB: If we ever
336 * implement a similar optimization we have in b-tree, and remove killed
337 * tuples outside VACUUM, we'll need to handle that here.
340 oldCxt = MemoryContextSwitchTo(opCtx);
343 case XLOG_GIST_PAGE_UPDATE:
344 gistRedoPageUpdateRecord(lsn, record);
346 case XLOG_GIST_PAGE_SPLIT:
347 gistRedoPageSplitRecord(lsn, record);
349 case XLOG_GIST_CREATE_INDEX:
350 gistRedoCreateIndex(lsn, record);
353 elog(PANIC, "gist_redo: unknown op code %u", info);
356 MemoryContextSwitchTo(oldCxt);
357 MemoryContextReset(opCtx);
361 gist_xlog_startup(void)
363 opCtx = createTempGistContext();
367 gist_xlog_cleanup(void)
369 MemoryContextDelete(opCtx);
373 * Write WAL record of a page split.
376 gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
377 SplitedPageLayout *dist,
378 BlockNumber origrlink, GistNSN orignsn,
379 Buffer leftchildbuf, bool markfollowright)
381 XLogRecData rdata[GIST_MAX_SPLIT_PAGES * 2 + 2];
382 gistxlogPageSplit xlrec;
383 SplitedPageLayout *ptr;
388 for (ptr = dist; ptr; ptr = ptr->next)
392 * the caller should've checked this already, but doesn't hurt to check
395 if (npage > GIST_MAX_SPLIT_PAGES)
396 elog(ERROR, "GiST page split into too many halves");
399 xlrec.origblkno = blkno;
400 xlrec.origrlink = origrlink;
401 xlrec.orignsn = orignsn;
402 xlrec.origleaf = page_is_leaf;
403 xlrec.npage = (uint16) npage;
405 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
406 xlrec.markfollowright = markfollowright;
408 rdata[0].data = (char *) &xlrec;
409 rdata[0].len = sizeof(gistxlogPageSplit);
410 rdata[0].buffer = InvalidBuffer;
415 * Include a full page image of the child buf. (only necessary if a
416 * checkpoint happened since the child page was split)
418 if (BufferIsValid(leftchildbuf))
420 rdata[cur - 1].next = &(rdata[cur]);
421 rdata[cur].data = NULL;
423 rdata[cur].buffer = leftchildbuf;
424 rdata[cur].buffer_std = true;
428 for (ptr = dist; ptr; ptr = ptr->next)
430 rdata[cur - 1].next = &(rdata[cur]);
431 rdata[cur].buffer = InvalidBuffer;
432 rdata[cur].data = (char *) &(ptr->block);
433 rdata[cur].len = sizeof(gistxlogPage);
436 rdata[cur - 1].next = &(rdata[cur]);
437 rdata[cur].buffer = InvalidBuffer;
438 rdata[cur].data = (char *) (ptr->list);
439 rdata[cur].len = ptr->lenlist;
442 rdata[cur - 1].next = NULL;
444 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
450 * Write XLOG record describing a page update. The update can include any
451 * number of deletions and/or insertions of tuples on a single index page.
453 * If this update inserts a downlink for a split page, also record that
454 * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
456 * Note that both the todelete array and the tuples are marked as belonging
457 * to the target buffer; they need not be stored in XLOG if XLogInsert decides
458 * to log the whole buffer contents instead. Also, we take care that there's
459 * at least one rdata item referencing the buffer, even when ntodelete and
460 * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
463 gistXLogUpdate(RelFileNode node, Buffer buffer,
464 OffsetNumber *todelete, int ntodelete,
465 IndexTuple *itup, int ituplen,
468 XLogRecData rdata[MaxIndexTuplesPerPage + 3];
469 gistxlogPageUpdate xlrec;
475 xlrec.blkno = BufferGetBlockNumber(buffer);
476 xlrec.ntodelete = ntodelete;
478 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
480 rdata[0].data = (char *) &xlrec;
481 rdata[0].len = sizeof(gistxlogPageUpdate);
482 rdata[0].buffer = InvalidBuffer;
483 rdata[0].next = &(rdata[1]);
485 rdata[1].data = (char *) todelete;
486 rdata[1].len = sizeof(OffsetNumber) * ntodelete;
487 rdata[1].buffer = buffer;
488 rdata[1].buffer_std = true;
493 for (i = 0; i < ituplen; i++)
495 rdata[cur - 1].next = &(rdata[cur]);
496 rdata[cur].data = (char *) (itup[i]);
497 rdata[cur].len = IndexTupleSize(itup[i]);
498 rdata[cur].buffer = buffer;
499 rdata[cur].buffer_std = true;
504 * Include a full page image of the child buf. (only necessary if a
505 * checkpoint happened since the child page was split)
507 if (BufferIsValid(leftchildbuf))
509 rdata[cur - 1].next = &(rdata[cur]);
510 rdata[cur].data = NULL;
512 rdata[cur].buffer = leftchildbuf;
513 rdata[cur].buffer_std = true;
516 rdata[cur - 1].next = NULL;
518 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);