]> granicus.if.org Git - postgresql/blob - src/backend/access/gist/gistxlog.c
Update copyright for 2014
[postgresql] / src / backend / access / gist / gistxlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * gistxlog.c
4  *        WAL replay logic for GiST.
5  *
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                       src/backend/access/gist/gistxlog.c
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15
16 #include "access/gist_private.h"
17 #include "access/xlogutils.h"
18 #include "utils/memutils.h"
19
20 typedef struct
21 {
22         gistxlogPage *header;
23         IndexTuple *itup;
24 } NewPage;
25
26 typedef struct
27 {
28         gistxlogPageSplit *data;
29         NewPage    *page;
30 } PageSplitRecord;
31
32 static MemoryContext opCtx;             /* working memory for operations */
33
34 /*
35  * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
36  *
37  * Even if the WAL record includes a full-page image, we have to update the
38  * follow-right flag, because that change is not included in the full-page
39  * image.  To be sure that the intermediate state with the wrong flag value is
40  * not visible to concurrent Hot Standby queries, this function handles
41  * restoring the full-page image as well as updating the flag.  (Note that
42  * we never need to do anything else to the child page in the current WAL
43  * action.)
44  */
45 static void
46 gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
47                                                  RelFileNode node, BlockNumber childblkno)
48 {
49         Buffer          buffer;
50         Page            page;
51
52         if (record->xl_info & XLR_BKP_BLOCK(block_index))
53                 buffer = RestoreBackupBlock(lsn, record, block_index, false, true);
54         else
55         {
56                 buffer = XLogReadBuffer(node, childblkno, false);
57                 if (!BufferIsValid(buffer))
58                         return;                         /* page was deleted, nothing to do */
59         }
60         page = (Page) BufferGetPage(buffer);
61
62         /*
63          * Note that we still update the page even if page LSN is equal to the LSN
64          * of this record, because the updated NSN is not included in the full
65          * page image.
66          */
67         if (lsn >= PageGetLSN(page))
68         {
69                 GistPageSetNSN(page, lsn);
70                 GistClearFollowRight(page);
71
72                 PageSetLSN(page, lsn);
73                 MarkBufferDirty(buffer);
74         }
75         UnlockReleaseBuffer(buffer);
76 }
77
78 /*
79  * redo any page update (except page split)
80  */
81 static void
82 gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
83 {
84         char       *begin = XLogRecGetData(record);
85         gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
86         Buffer          buffer;
87         Page            page;
88         char       *data;
89
90         /*
91          * We need to acquire and hold lock on target page while updating the left
92          * child page.  If we have a full-page image of target page, getting the
93          * lock is a side-effect of restoring that image.  Note that even if the
94          * target page no longer exists, we'll still attempt to replay the change
95          * on the child page.
96          */
97         if (record->xl_info & XLR_BKP_BLOCK(0))
98                 buffer = RestoreBackupBlock(lsn, record, 0, false, true);
99         else
100                 buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
101
102         /* Fix follow-right data on left child page */
103         if (BlockNumberIsValid(xldata->leftchild))
104                 gistRedoClearFollowRight(lsn, record, 1,
105                                                                  xldata->node, xldata->leftchild);
106
107         /* Done if target page no longer exists */
108         if (!BufferIsValid(buffer))
109                 return;
110
111         /* nothing more to do if page was backed up (and no info to do it with) */
112         if (record->xl_info & XLR_BKP_BLOCK(0))
113         {
114                 UnlockReleaseBuffer(buffer);
115                 return;
116         }
117
118         page = (Page) BufferGetPage(buffer);
119
120         /* nothing more to do if change already applied */
121         if (lsn <= PageGetLSN(page))
122         {
123                 UnlockReleaseBuffer(buffer);
124                 return;
125         }
126
127         data = begin + sizeof(gistxlogPageUpdate);
128
129         /* Delete old tuples */
130         if (xldata->ntodelete > 0)
131         {
132                 int                     i;
133                 OffsetNumber *todelete = (OffsetNumber *) data;
134
135                 data += sizeof(OffsetNumber) * xldata->ntodelete;
136
137                 for (i = 0; i < xldata->ntodelete; i++)
138                         PageIndexTupleDelete(page, todelete[i]);
139                 if (GistPageIsLeaf(page))
140                         GistMarkTuplesDeleted(page);
141         }
142
143         /* add tuples */
144         if (data - begin < record->xl_len)
145         {
146                 OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
147                 OffsetNumberNext(PageGetMaxOffsetNumber(page));
148
149                 while (data - begin < record->xl_len)
150                 {
151                         IndexTuple      itup = (IndexTuple) data;
152                         Size            sz = IndexTupleSize(itup);
153                         OffsetNumber l;
154
155                         data += sz;
156
157                         l = PageAddItem(page, (Item) itup, sz, off, false, false);
158                         if (l == InvalidOffsetNumber)
159                                 elog(ERROR, "failed to add item to GiST index page, size %d bytes",
160                                          (int) sz);
161                         off++;
162                 }
163         }
164         else
165         {
166                 /*
167                  * special case: leafpage, nothing to insert, nothing to delete, then
168                  * vacuum marks page
169                  */
170                 if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
171                         GistClearTuplesDeleted(page);
172         }
173
174         if (!GistPageIsLeaf(page) &&
175                 PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
176                 xldata->blkno == GIST_ROOT_BLKNO)
177         {
178                 /*
179                  * all links on non-leaf root page was deleted by vacuum full, so root
180                  * page becomes a leaf
181                  */
182                 GistPageSetLeaf(page);
183         }
184
185         GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
186         PageSetLSN(page, lsn);
187         MarkBufferDirty(buffer);
188         UnlockReleaseBuffer(buffer);
189 }
190
191 static void
192 decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
193 {
194         char       *begin = XLogRecGetData(record),
195                            *ptr;
196         int                     j,
197                                 i = 0;
198
199         decoded->data = (gistxlogPageSplit *) begin;
200         decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
201
202         ptr = begin + sizeof(gistxlogPageSplit);
203         for (i = 0; i < decoded->data->npage; i++)
204         {
205                 Assert(ptr - begin < record->xl_len);
206                 decoded->page[i].header = (gistxlogPage *) ptr;
207                 ptr += sizeof(gistxlogPage);
208
209                 decoded->page[i].itup = (IndexTuple *)
210                         palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
211                 j = 0;
212                 while (j < decoded->page[i].header->num)
213                 {
214                         Assert(ptr - begin < record->xl_len);
215                         decoded->page[i].itup[j] = (IndexTuple) ptr;
216                         ptr += IndexTupleSize((IndexTuple) ptr);
217                         j++;
218                 }
219         }
220 }
221
222 static void
223 gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
224 {
225         gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
226         PageSplitRecord xlrec;
227         Buffer          firstbuffer = InvalidBuffer;
228         Buffer          buffer;
229         Page            page;
230         int                     i;
231         bool            isrootsplit = false;
232
233         decodePageSplitRecord(&xlrec, record);
234
235         /*
236          * We must hold lock on the first-listed page throughout the action,
237          * including while updating the left child page (if any).  We can unlock
238          * remaining pages in the list as soon as they've been written, because
239          * there is no path for concurrent queries to reach those pages without
240          * first visiting the first-listed page.
241          */
242
243         /* loop around all pages */
244         for (i = 0; i < xlrec.data->npage; i++)
245         {
246                 NewPage    *newpage = xlrec.page + i;
247                 int                     flags;
248
249                 if (newpage->header->blkno == GIST_ROOT_BLKNO)
250                 {
251                         Assert(i == 0);
252                         isrootsplit = true;
253                 }
254
255                 buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
256                 Assert(BufferIsValid(buffer));
257                 page = (Page) BufferGetPage(buffer);
258
259                 /* ok, clear buffer */
260                 if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
261                         flags = F_LEAF;
262                 else
263                         flags = 0;
264                 GISTInitBuffer(buffer, flags);
265
266                 /* and fill it */
267                 gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
268
269                 if (newpage->header->blkno == GIST_ROOT_BLKNO)
270                 {
271                         GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
272                         GistPageSetNSN(page, xldata->orignsn);
273                         GistClearFollowRight(page);
274                 }
275                 else
276                 {
277                         if (i < xlrec.data->npage - 1)
278                                 GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
279                         else
280                                 GistPageGetOpaque(page)->rightlink = xldata->origrlink;
281                         GistPageSetNSN(page, xldata->orignsn);
282                         if (i < xlrec.data->npage - 1 && !isrootsplit &&
283                                 xldata->markfollowright)
284                                 GistMarkFollowRight(page);
285                         else
286                                 GistClearFollowRight(page);
287                 }
288
289                 PageSetLSN(page, lsn);
290                 MarkBufferDirty(buffer);
291
292                 if (i == 0)
293                         firstbuffer = buffer;
294                 else
295                         UnlockReleaseBuffer(buffer);
296         }
297
298         /* Fix follow-right data on left child page, if any */
299         if (BlockNumberIsValid(xldata->leftchild))
300                 gistRedoClearFollowRight(lsn, record, 0,
301                                                                  xldata->node, xldata->leftchild);
302
303         /* Finally, release lock on the first page */
304         UnlockReleaseBuffer(firstbuffer);
305 }
306
307 static void
308 gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
309 {
310         RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
311         Buffer          buffer;
312         Page            page;
313
314         /* Backup blocks are not used in create_index records */
315         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
316
317         buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
318         Assert(BufferIsValid(buffer));
319         page = (Page) BufferGetPage(buffer);
320
321         GISTInitBuffer(buffer, F_LEAF);
322
323         PageSetLSN(page, lsn);
324
325         MarkBufferDirty(buffer);
326         UnlockReleaseBuffer(buffer);
327 }
328
329 void
330 gist_redo(XLogRecPtr lsn, XLogRecord *record)
331 {
332         uint8           info = record->xl_info & ~XLR_INFO_MASK;
333         MemoryContext oldCxt;
334
335         /*
336          * GiST indexes do not require any conflict processing. NB: If we ever
337          * implement a similar optimization we have in b-tree, and remove killed
338          * tuples outside VACUUM, we'll need to handle that here.
339          */
340
341         oldCxt = MemoryContextSwitchTo(opCtx);
342         switch (info)
343         {
344                 case XLOG_GIST_PAGE_UPDATE:
345                         gistRedoPageUpdateRecord(lsn, record);
346                         break;
347                 case XLOG_GIST_PAGE_SPLIT:
348                         gistRedoPageSplitRecord(lsn, record);
349                         break;
350                 case XLOG_GIST_CREATE_INDEX:
351                         gistRedoCreateIndex(lsn, record);
352                         break;
353                 default:
354                         elog(PANIC, "gist_redo: unknown op code %u", info);
355         }
356
357         MemoryContextSwitchTo(oldCxt);
358         MemoryContextReset(opCtx);
359 }
360
361 void
362 gist_xlog_startup(void)
363 {
364         opCtx = createTempGistContext();
365 }
366
367 void
368 gist_xlog_cleanup(void)
369 {
370         MemoryContextDelete(opCtx);
371 }
372
373 /*
374  * Write WAL record of a page split.
375  */
376 XLogRecPtr
377 gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
378                           SplitedPageLayout *dist,
379                           BlockNumber origrlink, GistNSN orignsn,
380                           Buffer leftchildbuf, bool markfollowright)
381 {
382         XLogRecData *rdata;
383         gistxlogPageSplit xlrec;
384         SplitedPageLayout *ptr;
385         int                     npage = 0,
386                                 cur;
387         XLogRecPtr      recptr;
388
389         for (ptr = dist; ptr; ptr = ptr->next)
390                 npage++;
391
392         rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
393
394         xlrec.node = node;
395         xlrec.origblkno = blkno;
396         xlrec.origrlink = origrlink;
397         xlrec.orignsn = orignsn;
398         xlrec.origleaf = page_is_leaf;
399         xlrec.npage = (uint16) npage;
400         xlrec.leftchild =
401                 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
402         xlrec.markfollowright = markfollowright;
403
404         rdata[0].data = (char *) &xlrec;
405         rdata[0].len = sizeof(gistxlogPageSplit);
406         rdata[0].buffer = InvalidBuffer;
407
408         cur = 1;
409
410         /*
411          * Include a full page image of the child buf. (only necessary if a
412          * checkpoint happened since the child page was split)
413          */
414         if (BufferIsValid(leftchildbuf))
415         {
416                 rdata[cur - 1].next = &(rdata[cur]);
417                 rdata[cur].data = NULL;
418                 rdata[cur].len = 0;
419                 rdata[cur].buffer = leftchildbuf;
420                 rdata[cur].buffer_std = true;
421                 cur++;
422         }
423
424         for (ptr = dist; ptr; ptr = ptr->next)
425         {
426                 rdata[cur - 1].next = &(rdata[cur]);
427                 rdata[cur].buffer = InvalidBuffer;
428                 rdata[cur].data = (char *) &(ptr->block);
429                 rdata[cur].len = sizeof(gistxlogPage);
430                 cur++;
431
432                 rdata[cur - 1].next = &(rdata[cur]);
433                 rdata[cur].buffer = InvalidBuffer;
434                 rdata[cur].data = (char *) (ptr->list);
435                 rdata[cur].len = ptr->lenlist;
436                 cur++;
437         }
438         rdata[cur - 1].next = NULL;
439
440         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
441
442         pfree(rdata);
443         return recptr;
444 }
445
446 /*
447  * Write XLOG record describing a page update. The update can include any
448  * number of deletions and/or insertions of tuples on a single index page.
449  *
450  * If this update inserts a downlink for a split page, also record that
451  * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
452  *
453  * Note that both the todelete array and the tuples are marked as belonging
454  * to the target buffer; they need not be stored in XLOG if XLogInsert decides
455  * to log the whole buffer contents instead.  Also, we take care that there's
456  * at least one rdata item referencing the buffer, even when ntodelete and
457  * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
458  */
459 XLogRecPtr
460 gistXLogUpdate(RelFileNode node, Buffer buffer,
461                            OffsetNumber *todelete, int ntodelete,
462                            IndexTuple *itup, int ituplen,
463                            Buffer leftchildbuf)
464 {
465         XLogRecData *rdata;
466         gistxlogPageUpdate xlrec;
467         int                     cur,
468                                 i;
469         XLogRecPtr      recptr;
470
471         rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
472
473         xlrec.node = node;
474         xlrec.blkno = BufferGetBlockNumber(buffer);
475         xlrec.ntodelete = ntodelete;
476         xlrec.leftchild =
477                 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
478
479         rdata[0].data = (char *) &xlrec;
480         rdata[0].len = sizeof(gistxlogPageUpdate);
481         rdata[0].buffer = InvalidBuffer;
482         rdata[0].next = &(rdata[1]);
483
484         rdata[1].data = (char *) todelete;
485         rdata[1].len = sizeof(OffsetNumber) * ntodelete;
486         rdata[1].buffer = buffer;
487         rdata[1].buffer_std = true;
488
489         cur = 2;
490
491         /* new tuples */
492         for (i = 0; i < ituplen; i++)
493         {
494                 rdata[cur - 1].next = &(rdata[cur]);
495                 rdata[cur].data = (char *) (itup[i]);
496                 rdata[cur].len = IndexTupleSize(itup[i]);
497                 rdata[cur].buffer = buffer;
498                 rdata[cur].buffer_std = true;
499                 cur++;
500         }
501
502         /*
503          * Include a full page image of the child buf. (only necessary if a
504          * checkpoint happened since the child page was split)
505          */
506         if (BufferIsValid(leftchildbuf))
507         {
508                 rdata[cur - 1].next = &(rdata[cur]);
509                 rdata[cur].data = NULL;
510                 rdata[cur].len = 0;
511                 rdata[cur].buffer = leftchildbuf;
512                 rdata[cur].buffer_std = true;
513                 cur++;
514         }
515         rdata[cur - 1].next = NULL;
516
517         recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
518
519         pfree(rdata);
520         return recptr;
521 }