]> granicus.if.org Git - postgresql/blob - src/backend/access/gist/gistxlog.c
Refactor per-page logic common to all redo routines to a new function.
[postgresql] / src / backend / access / gist / gistxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * gistxlog.c
4  *        WAL replay logic for GiST.
5  *
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                       src/backend/access/gist/gistxlog.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include "access/gist_private.h"
17 #include "access/xlogutils.h"
18 #include "utils/memutils.h"
19
20 typedef struct
21 {
22         gistxlogPage *header;
23         IndexTuple *itup;
24 } NewPage;
25
26 typedef struct
27 {
28         gistxlogPageSplit *data;
29         NewPage    *page;
30 } PageSplitRecord;
31
32 static MemoryContext opCtx;             /* working memory for operations */
33
34 /*
35  * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
36  *
37  * Even if the WAL record includes a full-page image, we have to update the
38  * follow-right flag, because that change is not included in the full-page
39  * image.  To be sure that the intermediate state with the wrong flag value is
40  * not visible to concurrent Hot Standby queries, this function handles
41  * restoring the full-page image as well as updating the flag.  (Note that
42  * we never need to do anything else to the child page in the current WAL
43  * action.)
44  */
45 static void
46 gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
47                                                  RelFileNode node, BlockNumber childblkno)
48 {
49         Buffer          buffer;
50         Page            page;
51         XLogRedoAction action;
52
53         /*
54          * Note that we still update the page even if it was restored from a full
55          * page image, because the updated NSN is not included in the image.
56          */
57         action = XLogReadBufferForRedo(lsn, record, block_index, node, childblkno,
58                                                                    &buffer);
59         if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
60         {
61                 page = BufferGetPage(buffer);
62
63                 GistPageSetNSN(page, lsn);
64                 GistClearFollowRight(page);
65
66                 PageSetLSN(page, lsn);
67                 MarkBufferDirty(buffer);
68         }
69         if (BufferIsValid(buffer))
70                 UnlockReleaseBuffer(buffer);
71 }
72
73 /*
74  * redo any page update (except page split)
75  */
76 static void
77 gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
78 {
79         char       *begin = XLogRecGetData(record);
80         gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
81         Buffer          buffer;
82         Page            page;
83         char       *data;
84
85         if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
86                                                           &buffer) == BLK_NEEDS_REDO)
87         {
88                 page = (Page) BufferGetPage(buffer);
89
90                 data = begin + sizeof(gistxlogPageUpdate);
91
92                 /* Delete old tuples */
93                 if (xldata->ntodelete > 0)
94                 {
95                         int                     i;
96                         OffsetNumber *todelete = (OffsetNumber *) data;
97
98                         data += sizeof(OffsetNumber) * xldata->ntodelete;
99
100                         for (i = 0; i < xldata->ntodelete; i++)
101                                 PageIndexTupleDelete(page, todelete[i]);
102                         if (GistPageIsLeaf(page))
103                                 GistMarkTuplesDeleted(page);
104                 }
105
106                 /* add tuples */
107                 if (data - begin < record->xl_len)
108                 {
109                         OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
110                         OffsetNumberNext(PageGetMaxOffsetNumber(page));
111
112                         while (data - begin < record->xl_len)
113                         {
114                                 IndexTuple      itup = (IndexTuple) data;
115                                 Size            sz = IndexTupleSize(itup);
116                                 OffsetNumber l;
117
118                                 data += sz;
119
120                                 l = PageAddItem(page, (Item) itup, sz, off, false, false);
121                                 if (l == InvalidOffsetNumber)
122                                         elog(ERROR, "failed to add item to GiST index page, size %d bytes",
123                                                  (int) sz);
124                                 off++;
125                         }
126                 }
127                 else
128                 {
129                         /*
130                          * special case: leafpage, nothing to insert, nothing to delete,
131                          * then vacuum marks page
132                          */
133                         if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
134                                 GistClearTuplesDeleted(page);
135                 }
136
137                 if (!GistPageIsLeaf(page) &&
138                         PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
139                         xldata->blkno == GIST_ROOT_BLKNO)
140                 {
141                         /*
142                          * all links on non-leaf root page was deleted by vacuum full, so
143                          * root page becomes a leaf
144                          */
145                         GistPageSetLeaf(page);
146                 }
147
148                 PageSetLSN(page, lsn);
149                 MarkBufferDirty(buffer);
150         }
151
152         /*
153          * Fix follow-right data on left child page
154          *
155          * This must be done while still holding the lock on the target page. Note
156          * that even if the target page no longer exists, we still attempt to
157          * replay the change on the child page.
158          */
159         if (BlockNumberIsValid(xldata->leftchild))
160                 gistRedoClearFollowRight(lsn, record, 1,
161                                                                  xldata->node, xldata->leftchild);
162
163         if (BufferIsValid(buffer))
164                 UnlockReleaseBuffer(buffer);
165 }
166
167 static void
168 decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
169 {
170         char       *begin = XLogRecGetData(record),
171                            *ptr;
172         int                     j,
173                                 i = 0;
174
175         decoded->data = (gistxlogPageSplit *) begin;
176         decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
177
178         ptr = begin + sizeof(gistxlogPageSplit);
179         for (i = 0; i < decoded->data->npage; i++)
180         {
181                 Assert(ptr - begin < record->xl_len);
182                 decoded->page[i].header = (gistxlogPage *) ptr;
183                 ptr += sizeof(gistxlogPage);
184
185                 decoded->page[i].itup = (IndexTuple *)
186                         palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
187                 j = 0;
188                 while (j < decoded->page[i].header->num)
189                 {
190                         Assert(ptr - begin < record->xl_len);
191                         decoded->page[i].itup[j] = (IndexTuple) ptr;
192                         ptr += IndexTupleSize((IndexTuple) ptr);
193                         j++;
194                 }
195         }
196 }
197
198 static void
199 gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
200 {
201         gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
202         PageSplitRecord xlrec;
203         Buffer          firstbuffer = InvalidBuffer;
204         Buffer          buffer;
205         Page            page;
206         int                     i;
207         bool            isrootsplit = false;
208
209         decodePageSplitRecord(&xlrec, record);
210
211         /*
212          * We must hold lock on the first-listed page throughout the action,
213          * including while updating the left child page (if any).  We can unlock
214          * remaining pages in the list as soon as they've been written, because
215          * there is no path for concurrent queries to reach those pages without
216          * first visiting the first-listed page.
217          */
218
219         /* loop around all pages */
220         for (i = 0; i < xlrec.data->npage; i++)
221         {
222                 NewPage    *newpage = xlrec.page + i;
223                 int                     flags;
224
225                 if (newpage->header->blkno == GIST_ROOT_BLKNO)
226                 {
227                         Assert(i == 0);
228                         isrootsplit = true;
229                 }
230
231                 buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
232                 Assert(BufferIsValid(buffer));
233                 page = (Page) BufferGetPage(buffer);
234
235                 /* ok, clear buffer */
236                 if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
237                         flags = F_LEAF;
238                 else
239                         flags = 0;
240                 GISTInitBuffer(buffer, flags);
241
242                 /* and fill it */
243                 gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
244
245                 if (newpage->header->blkno == GIST_ROOT_BLKNO)
246                 {
247                         GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
248                         GistPageSetNSN(page, xldata->orignsn);
249                         GistClearFollowRight(page);
250                 }
251                 else
252                 {
253                         if (i < xlrec.data->npage - 1)
254                                 GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
255                         else
256                                 GistPageGetOpaque(page)->rightlink = xldata->origrlink;
257                         GistPageSetNSN(page, xldata->orignsn);
258                         if (i < xlrec.data->npage - 1 && !isrootsplit &&
259                                 xldata->markfollowright)
260                                 GistMarkFollowRight(page);
261                         else
262                                 GistClearFollowRight(page);
263                 }
264
265                 PageSetLSN(page, lsn);
266                 MarkBufferDirty(buffer);
267
268                 if (i == 0)
269                         firstbuffer = buffer;
270                 else
271                         UnlockReleaseBuffer(buffer);
272         }
273
274         /* Fix follow-right data on left child page, if any */
275         if (BlockNumberIsValid(xldata->leftchild))
276                 gistRedoClearFollowRight(lsn, record, 0,
277                                                                  xldata->node, xldata->leftchild);
278
279         /* Finally, release lock on the first page */
280         UnlockReleaseBuffer(firstbuffer);
281 }
282
283 static void
284 gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
285 {
286         RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
287         Buffer          buffer;
288         Page            page;
289
290         /* Backup blocks are not used in create_index records */
291         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
292
293         buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
294         Assert(BufferIsValid(buffer));
295         page = (Page) BufferGetPage(buffer);
296
297         GISTInitBuffer(buffer, F_LEAF);
298
299         PageSetLSN(page, lsn);
300
301         MarkBufferDirty(buffer);
302         UnlockReleaseBuffer(buffer);
303 }
304
305 void
306 gist_redo(XLogRecPtr lsn, XLogRecord *record)
307 {
308         uint8           info = record->xl_info & ~XLR_INFO_MASK;
309         MemoryContext oldCxt;
310
311         /*
312          * GiST indexes do not require any conflict processing. NB: If we ever
313          * implement a similar optimization we have in b-tree, and remove killed
314          * tuples outside VACUUM, we'll need to handle that here.
315          */
316
317         oldCxt = MemoryContextSwitchTo(opCtx);
318         switch (info)
319         {
320                 case XLOG_GIST_PAGE_UPDATE:
321                         gistRedoPageUpdateRecord(lsn, record);
322                         break;
323                 case XLOG_GIST_PAGE_SPLIT:
324                         gistRedoPageSplitRecord(lsn, record);
325                         break;
326                 case XLOG_GIST_CREATE_INDEX:
327                         gistRedoCreateIndex(lsn, record);
328                         break;
329                 default:
330                         elog(PANIC, "gist_redo: unknown op code %u", info);
331         }
332
333         MemoryContextSwitchTo(oldCxt);
334         MemoryContextReset(opCtx);
335 }
336
337 void
338 gist_xlog_startup(void)
339 {
340         opCtx = createTempGistContext();
341 }
342
343 void
344 gist_xlog_cleanup(void)
345 {
346         MemoryContextDelete(opCtx);
347 }
348
349 /*
350  * Write WAL record of a page split.
351  */
352 XLogRecPtr
353 gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
354                           SplitedPageLayout *dist,
355                           BlockNumber origrlink, GistNSN orignsn,
356                           Buffer leftchildbuf, bool markfollowright)
357 {
358         XLogRecData rdata[GIST_MAX_SPLIT_PAGES * 2 + 2];
359         gistxlogPageSplit xlrec;
360         SplitedPageLayout *ptr;
361         int                     npage = 0,
362                                 cur;
363         XLogRecPtr      recptr;
364
365         for (ptr = dist; ptr; ptr = ptr->next)
366                 npage++;
367
368         /*
369          * the caller should've checked this already, but doesn't hurt to check
370          * again.
371          */
372         if (npage > GIST_MAX_SPLIT_PAGES)
373                 elog(ERROR, "GiST page split into too many halves");
374
375         xlrec.node = node;
376         xlrec.origblkno = blkno;
377         xlrec.origrlink = origrlink;
378         xlrec.orignsn = orignsn;
379         xlrec.origleaf = page_is_leaf;
380         xlrec.npage = (uint16) npage;
381         xlrec.leftchild =
382                 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
383         xlrec.markfollowright = markfollowright;
384
385         rdata[0].data = (char *) &xlrec;
386         rdata[0].len = sizeof(gistxlogPageSplit);
387         rdata[0].buffer = InvalidBuffer;
388
389         cur = 1;
390
391         /*
392          * Include a full page image of the child buf. (only necessary if a
393          * checkpoint happened since the child page was split)
394          */
395         if (BufferIsValid(leftchildbuf))
396         {
397                 rdata[cur - 1].next = &(rdata[cur]);
398                 rdata[cur].data = NULL;
399                 rdata[cur].len = 0;
400                 rdata[cur].buffer = leftchildbuf;
401                 rdata[cur].buffer_std = true;
402                 cur++;
403         }
404
405         for (ptr = dist; ptr; ptr = ptr->next)
406         {
407                 rdata[cur - 1].next = &(rdata[cur]);
408                 rdata[cur].buffer = InvalidBuffer;
409                 rdata[cur].data = (char *) &(ptr->block);
410                 rdata[cur].len = sizeof(gistxlogPage);
411                 cur++;
412
413                 rdata[cur - 1].next = &(rdata[cur]);
414                 rdata[cur].buffer = InvalidBuffer;
415                 rdata[cur].data = (char *) (ptr->list);
416                 rdata[cur].len = ptr->lenlist;
417                 cur++;
418         }
419         rdata[cur - 1].next = NULL;
420
421         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
422
423         return recptr;
424 }
425
426 /*
427  * Write XLOG record describing a page update. The update can include any
428  * number of deletions and/or insertions of tuples on a single index page.
429  *
430  * If this update inserts a downlink for a split page, also record that
431  * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
432  *
433  * Note that both the todelete array and the tuples are marked as belonging
434  * to the target buffer; they need not be stored in XLOG if XLogInsert decides
435  * to log the whole buffer contents instead.  Also, we take care that there's
436  * at least one rdata item referencing the buffer, even when ntodelete and
437  * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
438  */
439 XLogRecPtr
440 gistXLogUpdate(RelFileNode node, Buffer buffer,
441                            OffsetNumber *todelete, int ntodelete,
442                            IndexTuple *itup, int ituplen,
443                            Buffer leftchildbuf)
444 {
445         XLogRecData rdata[MaxIndexTuplesPerPage + 3];
446         gistxlogPageUpdate xlrec;
447         int                     cur,
448                                 i;
449         XLogRecPtr      recptr;
450
451         xlrec.node = node;
452         xlrec.blkno = BufferGetBlockNumber(buffer);
453         xlrec.ntodelete = ntodelete;
454         xlrec.leftchild =
455                 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
456
457         rdata[0].data = (char *) &xlrec;
458         rdata[0].len = sizeof(gistxlogPageUpdate);
459         rdata[0].buffer = InvalidBuffer;
460         rdata[0].next = &(rdata[1]);
461
462         rdata[1].data = (char *) todelete;
463         rdata[1].len = sizeof(OffsetNumber) * ntodelete;
464         rdata[1].buffer = buffer;
465         rdata[1].buffer_std = true;
466
467         cur = 2;
468
469         /* new tuples */
470         for (i = 0; i < ituplen; i++)
471         {
472                 rdata[cur - 1].next = &(rdata[cur]);
473                 rdata[cur].data = (char *) (itup[i]);
474                 rdata[cur].len = IndexTupleSize(itup[i]);
475                 rdata[cur].buffer = buffer;
476                 rdata[cur].buffer_std = true;
477                 cur++;
478         }
479
480         /*
481          * Include a full page image of the child buf. (only necessary if a
482          * checkpoint happened since the child page was split)
483          */
484         if (BufferIsValid(leftchildbuf))
485         {
486                 rdata[cur - 1].next = &(rdata[cur]);
487                 rdata[cur].data = NULL;
488                 rdata[cur].len = 0;
489                 rdata[cur].buffer = leftchildbuf;
490                 rdata[cur].buffer_std = true;
491                 cur++;
492         }
493         rdata[cur - 1].next = NULL;
494
495         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
496
497         return recptr;
498 }