]> granicus.if.org Git - postgresql/blob - src/backend/access/gist/gistxlog.c
7d36b2ab6a301aad34d3fec4417b7c8edd80b24b
[postgresql] / src / backend / access / gist / gistxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * gistxlog.c
4  *        WAL replay logic for GiST.
5  *
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                       src/backend/access/gist/gistxlog.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include "access/gist_private.h"
17 #include "access/xlogutils.h"
18 #include "utils/memutils.h"
19
20 typedef struct
21 {
22         gistxlogPage *header;
23         IndexTuple *itup;
24 } NewPage;
25
26 typedef struct
27 {
28         gistxlogPageSplit *data;
29         NewPage    *page;
30 } PageSplitRecord;
31
32 static MemoryContext opCtx;             /* working memory for operations */
33
34 /*
35  * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
36  *
37  * Even if the WAL record includes a full-page image, we have to update the
38  * follow-right flag, because that change is not included in the full-page
39  * image.  To be sure that the intermediate state with the wrong flag value is
40  * not visible to concurrent Hot Standby queries, this function handles
41  * restoring the full-page image as well as updating the flag.  (Note that
42  * we never need to do anything else to the child page in the current WAL
43  * action.)
44  */
45 static void
46 gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
47                                                  RelFileNode node, BlockNumber childblkno)
48 {
49         Buffer          buffer;
50         Page            page;
51
52         if (record->xl_info & XLR_BKP_BLOCK(block_index))
53                 buffer = RestoreBackupBlock(lsn, record, block_index, false, true);
54         else
55         {
56                 buffer = XLogReadBuffer(node, childblkno, false);
57                 if (!BufferIsValid(buffer))
58                         return;                         /* page was deleted, nothing to do */
59         }
60         page = (Page) BufferGetPage(buffer);
61
62         /*
63          * Note that we still update the page even if page LSN is equal to the LSN
64          * of this record, because the updated NSN is not included in the full
65          * page image.
66          */
67         if (lsn >= PageGetLSN(page))
68         {
69                 GistPageSetNSN(page, lsn);
70                 GistClearFollowRight(page);
71
72                 PageSetLSN(page, lsn);
73                 MarkBufferDirty(buffer);
74         }
75         UnlockReleaseBuffer(buffer);
76 }
77
78 /*
79  * redo any page update (except page split)
80  */
81 static void
82 gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
83 {
84         char       *begin = XLogRecGetData(record);
85         gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
86         Buffer          buffer;
87         Page            page;
88         char       *data;
89
90         /*
91          * We need to acquire and hold lock on target page while updating the left
92          * child page.  If we have a full-page image of target page, getting the
93          * lock is a side-effect of restoring that image.  Note that even if the
94          * target page no longer exists, we'll still attempt to replay the change
95          * on the child page.
96          */
97         if (record->xl_info & XLR_BKP_BLOCK(0))
98                 buffer = RestoreBackupBlock(lsn, record, 0, false, true);
99         else
100                 buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
101
102         /* Fix follow-right data on left child page */
103         if (BlockNumberIsValid(xldata->leftchild))
104                 gistRedoClearFollowRight(lsn, record, 1,
105                                                                  xldata->node, xldata->leftchild);
106
107         /* Done if target page no longer exists */
108         if (!BufferIsValid(buffer))
109                 return;
110
111         /* nothing more to do if page was backed up (and no info to do it with) */
112         if (record->xl_info & XLR_BKP_BLOCK(0))
113         {
114                 UnlockReleaseBuffer(buffer);
115                 return;
116         }
117
118         page = (Page) BufferGetPage(buffer);
119
120         /* nothing more to do if change already applied */
121         if (lsn <= PageGetLSN(page))
122         {
123                 UnlockReleaseBuffer(buffer);
124                 return;
125         }
126
127         data = begin + sizeof(gistxlogPageUpdate);
128
129         /* Delete old tuples */
130         if (xldata->ntodelete > 0)
131         {
132                 int                     i;
133                 OffsetNumber *todelete = (OffsetNumber *) data;
134
135                 data += sizeof(OffsetNumber) * xldata->ntodelete;
136
137                 for (i = 0; i < xldata->ntodelete; i++)
138                         PageIndexTupleDelete(page, todelete[i]);
139                 if (GistPageIsLeaf(page))
140                         GistMarkTuplesDeleted(page);
141         }
142
143         /* add tuples */
144         if (data - begin < record->xl_len)
145         {
146                 OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
147                 OffsetNumberNext(PageGetMaxOffsetNumber(page));
148
149                 while (data - begin < record->xl_len)
150                 {
151                         IndexTuple      itup = (IndexTuple) data;
152                         Size            sz = IndexTupleSize(itup);
153                         OffsetNumber l;
154
155                         data += sz;
156
157                         l = PageAddItem(page, (Item) itup, sz, off, false, false);
158                         if (l == InvalidOffsetNumber)
159                                 elog(ERROR, "failed to add item to GiST index page, size %d bytes",
160                                          (int) sz);
161                         off++;
162                 }
163         }
164         else
165         {
166                 /*
167                  * special case: leafpage, nothing to insert, nothing to delete, then
168                  * vacuum marks page
169                  */
170                 if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
171                         GistClearTuplesDeleted(page);
172         }
173
174         if (!GistPageIsLeaf(page) &&
175                 PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
176                 xldata->blkno == GIST_ROOT_BLKNO)
177         {
178                 /*
179                  * all links on non-leaf root page was deleted by vacuum full, so root
180                  * page becomes a leaf
181                  */
182                 GistPageSetLeaf(page);
183         }
184
185         PageSetLSN(page, lsn);
186         MarkBufferDirty(buffer);
187         UnlockReleaseBuffer(buffer);
188 }
189
190 static void
191 decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
192 {
193         char       *begin = XLogRecGetData(record),
194                            *ptr;
195         int                     j,
196                                 i = 0;
197
198         decoded->data = (gistxlogPageSplit *) begin;
199         decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
200
201         ptr = begin + sizeof(gistxlogPageSplit);
202         for (i = 0; i < decoded->data->npage; i++)
203         {
204                 Assert(ptr - begin < record->xl_len);
205                 decoded->page[i].header = (gistxlogPage *) ptr;
206                 ptr += sizeof(gistxlogPage);
207
208                 decoded->page[i].itup = (IndexTuple *)
209                         palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
210                 j = 0;
211                 while (j < decoded->page[i].header->num)
212                 {
213                         Assert(ptr - begin < record->xl_len);
214                         decoded->page[i].itup[j] = (IndexTuple) ptr;
215                         ptr += IndexTupleSize((IndexTuple) ptr);
216                         j++;
217                 }
218         }
219 }
220
221 static void
222 gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
223 {
224         gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
225         PageSplitRecord xlrec;
226         Buffer          firstbuffer = InvalidBuffer;
227         Buffer          buffer;
228         Page            page;
229         int                     i;
230         bool            isrootsplit = false;
231
232         decodePageSplitRecord(&xlrec, record);
233
234         /*
235          * We must hold lock on the first-listed page throughout the action,
236          * including while updating the left child page (if any).  We can unlock
237          * remaining pages in the list as soon as they've been written, because
238          * there is no path for concurrent queries to reach those pages without
239          * first visiting the first-listed page.
240          */
241
242         /* loop around all pages */
243         for (i = 0; i < xlrec.data->npage; i++)
244         {
245                 NewPage    *newpage = xlrec.page + i;
246                 int                     flags;
247
248                 if (newpage->header->blkno == GIST_ROOT_BLKNO)
249                 {
250                         Assert(i == 0);
251                         isrootsplit = true;
252                 }
253
254                 buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
255                 Assert(BufferIsValid(buffer));
256                 page = (Page) BufferGetPage(buffer);
257
258                 /* ok, clear buffer */
259                 if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
260                         flags = F_LEAF;
261                 else
262                         flags = 0;
263                 GISTInitBuffer(buffer, flags);
264
265                 /* and fill it */
266                 gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
267
268                 if (newpage->header->blkno == GIST_ROOT_BLKNO)
269                 {
270                         GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
271                         GistPageSetNSN(page, xldata->orignsn);
272                         GistClearFollowRight(page);
273                 }
274                 else
275                 {
276                         if (i < xlrec.data->npage - 1)
277                                 GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
278                         else
279                                 GistPageGetOpaque(page)->rightlink = xldata->origrlink;
280                         GistPageSetNSN(page, xldata->orignsn);
281                         if (i < xlrec.data->npage - 1 && !isrootsplit &&
282                                 xldata->markfollowright)
283                                 GistMarkFollowRight(page);
284                         else
285                                 GistClearFollowRight(page);
286                 }
287
288                 PageSetLSN(page, lsn);
289                 MarkBufferDirty(buffer);
290
291                 if (i == 0)
292                         firstbuffer = buffer;
293                 else
294                         UnlockReleaseBuffer(buffer);
295         }
296
297         /* Fix follow-right data on left child page, if any */
298         if (BlockNumberIsValid(xldata->leftchild))
299                 gistRedoClearFollowRight(lsn, record, 0,
300                                                                  xldata->node, xldata->leftchild);
301
302         /* Finally, release lock on the first page */
303         UnlockReleaseBuffer(firstbuffer);
304 }
305
306 static void
307 gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
308 {
309         RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
310         Buffer          buffer;
311         Page            page;
312
313         /* Backup blocks are not used in create_index records */
314         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
315
316         buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
317         Assert(BufferIsValid(buffer));
318         page = (Page) BufferGetPage(buffer);
319
320         GISTInitBuffer(buffer, F_LEAF);
321
322         PageSetLSN(page, lsn);
323
324         MarkBufferDirty(buffer);
325         UnlockReleaseBuffer(buffer);
326 }
327
328 void
329 gist_redo(XLogRecPtr lsn, XLogRecord *record)
330 {
331         uint8           info = record->xl_info & ~XLR_INFO_MASK;
332         MemoryContext oldCxt;
333
334         /*
335          * GiST indexes do not require any conflict processing. NB: If we ever
336          * implement a similar optimization we have in b-tree, and remove killed
337          * tuples outside VACUUM, we'll need to handle that here.
338          */
339
340         oldCxt = MemoryContextSwitchTo(opCtx);
341         switch (info)
342         {
343                 case XLOG_GIST_PAGE_UPDATE:
344                         gistRedoPageUpdateRecord(lsn, record);
345                         break;
346                 case XLOG_GIST_PAGE_SPLIT:
347                         gistRedoPageSplitRecord(lsn, record);
348                         break;
349                 case XLOG_GIST_CREATE_INDEX:
350                         gistRedoCreateIndex(lsn, record);
351                         break;
352                 default:
353                         elog(PANIC, "gist_redo: unknown op code %u", info);
354         }
355
356         MemoryContextSwitchTo(oldCxt);
357         MemoryContextReset(opCtx);
358 }
359
360 void
361 gist_xlog_startup(void)
362 {
363         opCtx = createTempGistContext();
364 }
365
366 void
367 gist_xlog_cleanup(void)
368 {
369         MemoryContextDelete(opCtx);
370 }
371
372 /*
373  * Write WAL record of a page split.
374  */
375 XLogRecPtr
376 gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
377                           SplitedPageLayout *dist,
378                           BlockNumber origrlink, GistNSN orignsn,
379                           Buffer leftchildbuf, bool markfollowright)
380 {
381         XLogRecData rdata[GIST_MAX_SPLIT_PAGES * 2 + 2];
382         gistxlogPageSplit xlrec;
383         SplitedPageLayout *ptr;
384         int                     npage = 0,
385                                 cur;
386         XLogRecPtr      recptr;
387
388         for (ptr = dist; ptr; ptr = ptr->next)
389                 npage++;
390
391         /*
392          * the caller should've checked this already, but doesn't hurt to check
393          * again.
394          */
395         if (npage > GIST_MAX_SPLIT_PAGES)
396                 elog(ERROR, "GiST page split into too many halves");
397
398         xlrec.node = node;
399         xlrec.origblkno = blkno;
400         xlrec.origrlink = origrlink;
401         xlrec.orignsn = orignsn;
402         xlrec.origleaf = page_is_leaf;
403         xlrec.npage = (uint16) npage;
404         xlrec.leftchild =
405                 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
406         xlrec.markfollowright = markfollowright;
407
408         rdata[0].data = (char *) &xlrec;
409         rdata[0].len = sizeof(gistxlogPageSplit);
410         rdata[0].buffer = InvalidBuffer;
411
412         cur = 1;
413
414         /*
415          * Include a full page image of the child buf. (only necessary if a
416          * checkpoint happened since the child page was split)
417          */
418         if (BufferIsValid(leftchildbuf))
419         {
420                 rdata[cur - 1].next = &(rdata[cur]);
421                 rdata[cur].data = NULL;
422                 rdata[cur].len = 0;
423                 rdata[cur].buffer = leftchildbuf;
424                 rdata[cur].buffer_std = true;
425                 cur++;
426         }
427
428         for (ptr = dist; ptr; ptr = ptr->next)
429         {
430                 rdata[cur - 1].next = &(rdata[cur]);
431                 rdata[cur].buffer = InvalidBuffer;
432                 rdata[cur].data = (char *) &(ptr->block);
433                 rdata[cur].len = sizeof(gistxlogPage);
434                 cur++;
435
436                 rdata[cur - 1].next = &(rdata[cur]);
437                 rdata[cur].buffer = InvalidBuffer;
438                 rdata[cur].data = (char *) (ptr->list);
439                 rdata[cur].len = ptr->lenlist;
440                 cur++;
441         }
442         rdata[cur - 1].next = NULL;
443
444         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
445
446         return recptr;
447 }
448
449 /*
450  * Write XLOG record describing a page update. The update can include any
451  * number of deletions and/or insertions of tuples on a single index page.
452  *
453  * If this update inserts a downlink for a split page, also record that
454  * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
455  *
456  * Note that both the todelete array and the tuples are marked as belonging
457  * to the target buffer; they need not be stored in XLOG if XLogInsert decides
458  * to log the whole buffer contents instead.  Also, we take care that there's
459  * at least one rdata item referencing the buffer, even when ntodelete and
460  * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
461  */
462 XLogRecPtr
463 gistXLogUpdate(RelFileNode node, Buffer buffer,
464                            OffsetNumber *todelete, int ntodelete,
465                            IndexTuple *itup, int ituplen,
466                            Buffer leftchildbuf)
467 {
468         XLogRecData rdata[MaxIndexTuplesPerPage + 3];
469         gistxlogPageUpdate xlrec;
470         int                     cur,
471                                 i;
472         XLogRecPtr      recptr;
473
474         xlrec.node = node;
475         xlrec.blkno = BufferGetBlockNumber(buffer);
476         xlrec.ntodelete = ntodelete;
477         xlrec.leftchild =
478                 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
479
480         rdata[0].data = (char *) &xlrec;
481         rdata[0].len = sizeof(gistxlogPageUpdate);
482         rdata[0].buffer = InvalidBuffer;
483         rdata[0].next = &(rdata[1]);
484
485         rdata[1].data = (char *) todelete;
486         rdata[1].len = sizeof(OffsetNumber) * ntodelete;
487         rdata[1].buffer = buffer;
488         rdata[1].buffer_std = true;
489
490         cur = 2;
491
492         /* new tuples */
493         for (i = 0; i < ituplen; i++)
494         {
495                 rdata[cur - 1].next = &(rdata[cur]);
496                 rdata[cur].data = (char *) (itup[i]);
497                 rdata[cur].len = IndexTupleSize(itup[i]);
498                 rdata[cur].buffer = buffer;
499                 rdata[cur].buffer_std = true;
500                 cur++;
501         }
502
503         /*
504          * Include a full page image of the child buf. (only necessary if a
505          * checkpoint happened since the child page was split)
506          */
507         if (BufferIsValid(leftchildbuf))
508         {
509                 rdata[cur - 1].next = &(rdata[cur]);
510                 rdata[cur].data = NULL;
511                 rdata[cur].len = 0;
512                 rdata[cur].buffer = leftchildbuf;
513                 rdata[cur].buffer_std = true;
514                 cur++;
515         }
516         rdata[cur - 1].next = NULL;
517
518         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
519
520         return recptr;
521 }