]> granicus.if.org Git - postgresql/blob - src/backend/access/gist/gistxlog.c
b732f53267991159f9f19663c731d02af026d8ab
[postgresql] / src / backend / access / gist / gistxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * gistxlog.c
4  *        WAL replay logic for GiST.
5  *
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                       src/backend/access/gist/gistxlog.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include "access/gist_private.h"
17 #include "access/xloginsert.h"
18 #include "access/xlogutils.h"
19 #include "utils/memutils.h"
20
21 typedef struct
22 {
23         gistxlogPage *header;
24         IndexTuple *itup;
25 } NewPage;
26
27 typedef struct
28 {
29         gistxlogPageSplit *data;
30         NewPage    *page;
31 } PageSplitRecord;
32
33 static MemoryContext opCtx;             /* working memory for operations */
34
35 /*
36  * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
37  *
38  * Even if the WAL record includes a full-page image, we have to update the
39  * follow-right flag, because that change is not included in the full-page
40  * image.  To be sure that the intermediate state with the wrong flag value is
41  * not visible to concurrent Hot Standby queries, this function handles
42  * restoring the full-page image as well as updating the flag.  (Note that
43  * we never need to do anything else to the child page in the current WAL
44  * action.)
45  */
46 static void
47 gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
48                                                  RelFileNode node, BlockNumber childblkno)
49 {
50         Buffer          buffer;
51         Page            page;
52         XLogRedoAction action;
53
54         /*
55          * Note that we still update the page even if it was restored from a full
56          * page image, because the updated NSN is not included in the image.
57          */
58         action = XLogReadBufferForRedo(lsn, record, block_index, node, childblkno,
59                                                                    &buffer);
60         if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
61         {
62                 page = BufferGetPage(buffer);
63
64                 GistPageSetNSN(page, lsn);
65                 GistClearFollowRight(page);
66
67                 PageSetLSN(page, lsn);
68                 MarkBufferDirty(buffer);
69         }
70         if (BufferIsValid(buffer))
71                 UnlockReleaseBuffer(buffer);
72 }
73
74 /*
75  * redo any page update (except page split)
76  */
77 static void
78 gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
79 {
80         char       *begin = XLogRecGetData(record);
81         gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
82         Buffer          buffer;
83         Page            page;
84         char       *data;
85
86         if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
87                                                           &buffer) == BLK_NEEDS_REDO)
88         {
89                 page = (Page) BufferGetPage(buffer);
90
91                 data = begin + sizeof(gistxlogPageUpdate);
92
93                 /* Delete old tuples */
94                 if (xldata->ntodelete > 0)
95                 {
96                         int                     i;
97                         OffsetNumber *todelete = (OffsetNumber *) data;
98
99                         data += sizeof(OffsetNumber) * xldata->ntodelete;
100
101                         for (i = 0; i < xldata->ntodelete; i++)
102                                 PageIndexTupleDelete(page, todelete[i]);
103                         if (GistPageIsLeaf(page))
104                                 GistMarkTuplesDeleted(page);
105                 }
106
107                 /* add tuples */
108                 if (data - begin < record->xl_len)
109                 {
110                         OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
111                         OffsetNumberNext(PageGetMaxOffsetNumber(page));
112
113                         while (data - begin < record->xl_len)
114                         {
115                                 IndexTuple      itup = (IndexTuple) data;
116                                 Size            sz = IndexTupleSize(itup);
117                                 OffsetNumber l;
118
119                                 data += sz;
120
121                                 l = PageAddItem(page, (Item) itup, sz, off, false, false);
122                                 if (l == InvalidOffsetNumber)
123                                         elog(ERROR, "failed to add item to GiST index page, size %d bytes",
124                                                  (int) sz);
125                                 off++;
126                         }
127                 }
128                 else
129                 {
130                         /*
131                          * special case: leafpage, nothing to insert, nothing to delete,
132                          * then vacuum marks page
133                          */
134                         if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
135                                 GistClearTuplesDeleted(page);
136                 }
137
138                 if (!GistPageIsLeaf(page) &&
139                         PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
140                         xldata->blkno == GIST_ROOT_BLKNO)
141                 {
142                         /*
143                          * all links on non-leaf root page was deleted by vacuum full, so
144                          * root page becomes a leaf
145                          */
146                         GistPageSetLeaf(page);
147                 }
148
149                 PageSetLSN(page, lsn);
150                 MarkBufferDirty(buffer);
151         }
152
153         /*
154          * Fix follow-right data on left child page
155          *
156          * This must be done while still holding the lock on the target page. Note
157          * that even if the target page no longer exists, we still attempt to
158          * replay the change on the child page.
159          */
160         if (BlockNumberIsValid(xldata->leftchild))
161                 gistRedoClearFollowRight(lsn, record, 1,
162                                                                  xldata->node, xldata->leftchild);
163
164         if (BufferIsValid(buffer))
165                 UnlockReleaseBuffer(buffer);
166 }
167
168 static void
169 decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
170 {
171         char       *begin = XLogRecGetData(record),
172                            *ptr;
173         int                     j,
174                                 i = 0;
175
176         decoded->data = (gistxlogPageSplit *) begin;
177         decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
178
179         ptr = begin + sizeof(gistxlogPageSplit);
180         for (i = 0; i < decoded->data->npage; i++)
181         {
182                 Assert(ptr - begin < record->xl_len);
183                 decoded->page[i].header = (gistxlogPage *) ptr;
184                 ptr += sizeof(gistxlogPage);
185
186                 decoded->page[i].itup = (IndexTuple *)
187                         palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
188                 j = 0;
189                 while (j < decoded->page[i].header->num)
190                 {
191                         Assert(ptr - begin < record->xl_len);
192                         decoded->page[i].itup[j] = (IndexTuple) ptr;
193                         ptr += IndexTupleSize((IndexTuple) ptr);
194                         j++;
195                 }
196         }
197 }
198
199 static void
200 gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
201 {
202         gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
203         PageSplitRecord xlrec;
204         Buffer          firstbuffer = InvalidBuffer;
205         Buffer          buffer;
206         Page            page;
207         int                     i;
208         bool            isrootsplit = false;
209
210         decodePageSplitRecord(&xlrec, record);
211
212         /*
213          * We must hold lock on the first-listed page throughout the action,
214          * including while updating the left child page (if any).  We can unlock
215          * remaining pages in the list as soon as they've been written, because
216          * there is no path for concurrent queries to reach those pages without
217          * first visiting the first-listed page.
218          */
219
220         /* loop around all pages */
221         for (i = 0; i < xlrec.data->npage; i++)
222         {
223                 NewPage    *newpage = xlrec.page + i;
224                 int                     flags;
225
226                 if (newpage->header->blkno == GIST_ROOT_BLKNO)
227                 {
228                         Assert(i == 0);
229                         isrootsplit = true;
230                 }
231
232                 buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
233                 Assert(BufferIsValid(buffer));
234                 page = (Page) BufferGetPage(buffer);
235
236                 /* ok, clear buffer */
237                 if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
238                         flags = F_LEAF;
239                 else
240                         flags = 0;
241                 GISTInitBuffer(buffer, flags);
242
243                 /* and fill it */
244                 gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
245
246                 if (newpage->header->blkno == GIST_ROOT_BLKNO)
247                 {
248                         GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
249                         GistPageSetNSN(page, xldata->orignsn);
250                         GistClearFollowRight(page);
251                 }
252                 else
253                 {
254                         if (i < xlrec.data->npage - 1)
255                                 GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
256                         else
257                                 GistPageGetOpaque(page)->rightlink = xldata->origrlink;
258                         GistPageSetNSN(page, xldata->orignsn);
259                         if (i < xlrec.data->npage - 1 && !isrootsplit &&
260                                 xldata->markfollowright)
261                                 GistMarkFollowRight(page);
262                         else
263                                 GistClearFollowRight(page);
264                 }
265
266                 PageSetLSN(page, lsn);
267                 MarkBufferDirty(buffer);
268
269                 if (i == 0)
270                         firstbuffer = buffer;
271                 else
272                         UnlockReleaseBuffer(buffer);
273         }
274
275         /* Fix follow-right data on left child page, if any */
276         if (BlockNumberIsValid(xldata->leftchild))
277                 gistRedoClearFollowRight(lsn, record, 0,
278                                                                  xldata->node, xldata->leftchild);
279
280         /* Finally, release lock on the first page */
281         UnlockReleaseBuffer(firstbuffer);
282 }
283
284 static void
285 gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
286 {
287         RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
288         Buffer          buffer;
289         Page            page;
290
291         /* Backup blocks are not used in create_index records */
292         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
293
294         buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
295         Assert(BufferIsValid(buffer));
296         page = (Page) BufferGetPage(buffer);
297
298         GISTInitBuffer(buffer, F_LEAF);
299
300         PageSetLSN(page, lsn);
301
302         MarkBufferDirty(buffer);
303         UnlockReleaseBuffer(buffer);
304 }
305
306 void
307 gist_redo(XLogRecPtr lsn, XLogRecord *record)
308 {
309         uint8           info = record->xl_info & ~XLR_INFO_MASK;
310         MemoryContext oldCxt;
311
312         /*
313          * GiST indexes do not require any conflict processing. NB: If we ever
314          * implement a similar optimization we have in b-tree, and remove killed
315          * tuples outside VACUUM, we'll need to handle that here.
316          */
317
318         oldCxt = MemoryContextSwitchTo(opCtx);
319         switch (info)
320         {
321                 case XLOG_GIST_PAGE_UPDATE:
322                         gistRedoPageUpdateRecord(lsn, record);
323                         break;
324                 case XLOG_GIST_PAGE_SPLIT:
325                         gistRedoPageSplitRecord(lsn, record);
326                         break;
327                 case XLOG_GIST_CREATE_INDEX:
328                         gistRedoCreateIndex(lsn, record);
329                         break;
330                 default:
331                         elog(PANIC, "gist_redo: unknown op code %u", info);
332         }
333
334         MemoryContextSwitchTo(oldCxt);
335         MemoryContextReset(opCtx);
336 }
337
338 void
339 gist_xlog_startup(void)
340 {
341         opCtx = createTempGistContext();
342 }
343
344 void
345 gist_xlog_cleanup(void)
346 {
347         MemoryContextDelete(opCtx);
348 }
349
350 /*
351  * Write WAL record of a page split.
352  */
353 XLogRecPtr
354 gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
355                           SplitedPageLayout *dist,
356                           BlockNumber origrlink, GistNSN orignsn,
357                           Buffer leftchildbuf, bool markfollowright)
358 {
359         XLogRecData rdata[GIST_MAX_SPLIT_PAGES * 2 + 2];
360         gistxlogPageSplit xlrec;
361         SplitedPageLayout *ptr;
362         int                     npage = 0,
363                                 cur;
364         XLogRecPtr      recptr;
365
366         for (ptr = dist; ptr; ptr = ptr->next)
367                 npage++;
368
369         /*
370          * the caller should've checked this already, but doesn't hurt to check
371          * again.
372          */
373         if (npage > GIST_MAX_SPLIT_PAGES)
374                 elog(ERROR, "GiST page split into too many halves");
375
376         xlrec.node = node;
377         xlrec.origblkno = blkno;
378         xlrec.origrlink = origrlink;
379         xlrec.orignsn = orignsn;
380         xlrec.origleaf = page_is_leaf;
381         xlrec.npage = (uint16) npage;
382         xlrec.leftchild =
383                 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
384         xlrec.markfollowright = markfollowright;
385
386         rdata[0].data = (char *) &xlrec;
387         rdata[0].len = sizeof(gistxlogPageSplit);
388         rdata[0].buffer = InvalidBuffer;
389
390         cur = 1;
391
392         /*
393          * Include a full page image of the child buf. (only necessary if a
394          * checkpoint happened since the child page was split)
395          */
396         if (BufferIsValid(leftchildbuf))
397         {
398                 rdata[cur - 1].next = &(rdata[cur]);
399                 rdata[cur].data = NULL;
400                 rdata[cur].len = 0;
401                 rdata[cur].buffer = leftchildbuf;
402                 rdata[cur].buffer_std = true;
403                 cur++;
404         }
405
406         for (ptr = dist; ptr; ptr = ptr->next)
407         {
408                 rdata[cur - 1].next = &(rdata[cur]);
409                 rdata[cur].buffer = InvalidBuffer;
410                 rdata[cur].data = (char *) &(ptr->block);
411                 rdata[cur].len = sizeof(gistxlogPage);
412                 cur++;
413
414                 rdata[cur - 1].next = &(rdata[cur]);
415                 rdata[cur].buffer = InvalidBuffer;
416                 rdata[cur].data = (char *) (ptr->list);
417                 rdata[cur].len = ptr->lenlist;
418                 cur++;
419         }
420         rdata[cur - 1].next = NULL;
421
422         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
423
424         return recptr;
425 }
426
427 /*
428  * Write XLOG record describing a page update. The update can include any
429  * number of deletions and/or insertions of tuples on a single index page.
430  *
431  * If this update inserts a downlink for a split page, also record that
432  * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
433  *
434  * Note that both the todelete array and the tuples are marked as belonging
435  * to the target buffer; they need not be stored in XLOG if XLogInsert decides
436  * to log the whole buffer contents instead.  Also, we take care that there's
437  * at least one rdata item referencing the buffer, even when ntodelete and
438  * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
439  */
440 XLogRecPtr
441 gistXLogUpdate(RelFileNode node, Buffer buffer,
442                            OffsetNumber *todelete, int ntodelete,
443                            IndexTuple *itup, int ituplen,
444                            Buffer leftchildbuf)
445 {
446         XLogRecData rdata[MaxIndexTuplesPerPage + 3];
447         gistxlogPageUpdate xlrec;
448         int                     cur,
449                                 i;
450         XLogRecPtr      recptr;
451
452         xlrec.node = node;
453         xlrec.blkno = BufferGetBlockNumber(buffer);
454         xlrec.ntodelete = ntodelete;
455         xlrec.leftchild =
456                 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
457
458         rdata[0].data = (char *) &xlrec;
459         rdata[0].len = sizeof(gistxlogPageUpdate);
460         rdata[0].buffer = InvalidBuffer;
461         rdata[0].next = &(rdata[1]);
462
463         rdata[1].data = (char *) todelete;
464         rdata[1].len = sizeof(OffsetNumber) * ntodelete;
465         rdata[1].buffer = buffer;
466         rdata[1].buffer_std = true;
467
468         cur = 2;
469
470         /* new tuples */
471         for (i = 0; i < ituplen; i++)
472         {
473                 rdata[cur - 1].next = &(rdata[cur]);
474                 rdata[cur].data = (char *) (itup[i]);
475                 rdata[cur].len = IndexTupleSize(itup[i]);
476                 rdata[cur].buffer = buffer;
477                 rdata[cur].buffer_std = true;
478                 cur++;
479         }
480
481         /*
482          * Include a full page image of the child buf. (only necessary if a
483          * checkpoint happened since the child page was split)
484          */
485         if (BufferIsValid(leftchildbuf))
486         {
487                 rdata[cur - 1].next = &(rdata[cur]);
488                 rdata[cur].data = NULL;
489                 rdata[cur].len = 0;
490                 rdata[cur].buffer = leftchildbuf;
491                 rdata[cur].buffer_std = true;
492                 cur++;
493         }
494         rdata[cur - 1].next = NULL;
495
496         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
497
498         return recptr;
499 }