]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/gindatapage.c
Restructure some header files a bit, in particular heapam.h, by removing some
[postgresql] / src / backend / access / gin / gindatapage.c
1 /*-------------------------------------------------------------------------
2  *
3  * gindatapage.c
4  *        page utilities routines for the postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      $PostgreSQL: pgsql/src/backend/access/gin/gindatapage.c,v 1.10 2008/05/12 00:00:44 alvherre Exp $
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/gin.h"
18 #include "storage/bufmgr.h"
19
20 int
21 compareItemPointers(ItemPointer a, ItemPointer b)
22 {
23         if (GinItemPointerGetBlockNumber(a) == GinItemPointerGetBlockNumber(b))
24         {
25                 if (GinItemPointerGetOffsetNumber(a) == GinItemPointerGetOffsetNumber(b))
26                         return 0;
27                 return (GinItemPointerGetOffsetNumber(a) > GinItemPointerGetOffsetNumber(b)) ? 1 : -1;
28         }
29
30         return (GinItemPointerGetBlockNumber(a) > GinItemPointerGetBlockNumber(b)) ? 1 : -1;
31 }
32
33 /*
34  * Merge two ordered array of itempointer
35  */
36 void
37 MergeItemPointers(ItemPointerData *dst, ItemPointerData *a, uint32 na, ItemPointerData *b, uint32 nb)
38 {
39         ItemPointerData *dptr = dst;
40         ItemPointerData *aptr = a,
41                            *bptr = b;
42
43         while (aptr - a < na && bptr - b < nb)
44         {
45                 if (compareItemPointers(aptr, bptr) > 0)
46                         *dptr++ = *bptr++;
47                 else
48                         *dptr++ = *aptr++;
49         }
50
51         while (aptr - a < na)
52                 *dptr++ = *aptr++;
53
54         while (bptr - b < nb)
55                 *dptr++ = *bptr++;
56 }
57
58 /*
59  * Checks, should we move to right link...
60  * Compares inserting itemp pointer with right bound of current page
61  */
62 static bool
63 dataIsMoveRight(GinBtree btree, Page page)
64 {
65         ItemPointer iptr = GinDataPageGetRightBound(page);
66
67         if (GinPageRightMost(page))
68                 return FALSE;
69
70         return (compareItemPointers(btree->items + btree->curitem, iptr) > 0) ? TRUE : FALSE;
71 }
72
73 /*
74  * Find correct PostingItem in non-leaf page. It supposed that page
75  * correctly chosen and searching value SHOULD be on page
76  */
77 static BlockNumber
78 dataLocateItem(GinBtree btree, GinBtreeStack *stack)
79 {
80         OffsetNumber low,
81                                 high,
82                                 maxoff;
83         PostingItem *pitem = NULL;
84         int                     result;
85         Page            page = BufferGetPage(stack->buffer);
86
87         Assert(!GinPageIsLeaf(page));
88         Assert(GinPageIsData(page));
89
90         if (btree->fullScan)
91         {
92                 stack->off = FirstOffsetNumber;
93                 stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
94                 return btree->getLeftMostPage(btree, page);
95         }
96
97         low = FirstOffsetNumber;
98         maxoff = high = GinPageGetOpaque(page)->maxoff;
99         Assert(high >= low);
100
101         high++;
102
103         while (high > low)
104         {
105                 OffsetNumber mid = low + ((high - low) / 2);
106
107                 pitem = (PostingItem *) GinDataPageGetItem(page, mid);
108
109                 if (mid == maxoff)
110
111                         /*
112                          * Right infinity, page already correctly chosen with a help of
113                          * dataIsMoveRight
114                          */
115                         result = -1;
116                 else
117                 {
118                         pitem = (PostingItem *) GinDataPageGetItem(page, mid);
119                         result = compareItemPointers(btree->items + btree->curitem, &(pitem->key));
120                 }
121
122                 if (result == 0)
123                 {
124                         stack->off = mid;
125                         return PostingItemGetBlockNumber(pitem);
126                 }
127                 else if (result > 0)
128                         low = mid + 1;
129                 else
130                         high = mid;
131         }
132
133         Assert(high >= FirstOffsetNumber && high <= maxoff);
134
135         stack->off = high;
136         pitem = (PostingItem *) GinDataPageGetItem(page, high);
137         return PostingItemGetBlockNumber(pitem);
138 }
139
140 /*
141  * Searches correct position for value on leaf page.
142  * Page should be correctly chosen.
143  * Returns true if value found on page.
144  */
145 static bool
146 dataLocateLeafItem(GinBtree btree, GinBtreeStack *stack)
147 {
148         Page            page = BufferGetPage(stack->buffer);
149         OffsetNumber low,
150                                 high;
151         int                     result;
152
153         Assert(GinPageIsLeaf(page));
154         Assert(GinPageIsData(page));
155
156         if (btree->fullScan)
157         {
158                 stack->off = FirstOffsetNumber;
159                 return TRUE;
160         }
161
162         low = FirstOffsetNumber;
163         high = GinPageGetOpaque(page)->maxoff;
164
165         if (high < low)
166         {
167                 stack->off = FirstOffsetNumber;
168                 return false;
169         }
170
171         high++;
172
173         while (high > low)
174         {
175                 OffsetNumber mid = low + ((high - low) / 2);
176
177                 result = compareItemPointers(btree->items + btree->curitem, (ItemPointer) GinDataPageGetItem(page, mid));
178
179                 if (result == 0)
180                 {
181                         stack->off = mid;
182                         return true;
183                 }
184                 else if (result > 0)
185                         low = mid + 1;
186                 else
187                         high = mid;
188         }
189
190         stack->off = high;
191         return false;
192 }
193
194 /*
195  * Finds links to blkno on non-leaf page, returns
196  * offset of PostingItem
197  */
198 static OffsetNumber
199 dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
200 {
201         OffsetNumber i,
202                                 maxoff = GinPageGetOpaque(page)->maxoff;
203         PostingItem *pitem;
204
205         Assert(!GinPageIsLeaf(page));
206         Assert(GinPageIsData(page));
207
208         /* if page isn't changed, we returns storedOff */
209         if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
210         {
211                 pitem = (PostingItem *) GinDataPageGetItem(page, storedOff);
212                 if (PostingItemGetBlockNumber(pitem) == blkno)
213                         return storedOff;
214
215                 /*
216                  * we hope, that needed pointer goes to right. It's true if there
217                  * wasn't a deletion
218                  */
219                 for (i = storedOff + 1; i <= maxoff; i++)
220                 {
221                         pitem = (PostingItem *) GinDataPageGetItem(page, i);
222                         if (PostingItemGetBlockNumber(pitem) == blkno)
223                                 return i;
224                 }
225
226                 maxoff = storedOff - 1;
227         }
228
229         /* last chance */
230         for (i = FirstOffsetNumber; i <= maxoff; i++)
231         {
232                 pitem = (PostingItem *) GinDataPageGetItem(page, i);
233                 if (PostingItemGetBlockNumber(pitem) == blkno)
234                         return i;
235         }
236
237         return InvalidOffsetNumber;
238 }
239
240 /*
241  * returns blkno of leftmost child
242  */
243 static BlockNumber
244 dataGetLeftMostPage(GinBtree btree, Page page)
245 {
246         PostingItem *pitem;
247
248         Assert(!GinPageIsLeaf(page));
249         Assert(GinPageIsData(page));
250         Assert(GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber);
251
252         pitem = (PostingItem *) GinDataPageGetItem(page, FirstOffsetNumber);
253         return PostingItemGetBlockNumber(pitem);
254 }
255
256 /*
257  * add ItemPointer or PostingItem to page. data should point to
258  * correct value! depending on leaf or non-leaf page
259  */
260 void
261 GinDataPageAddItem(Page page, void *data, OffsetNumber offset)
262 {
263         OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
264         char       *ptr;
265
266         if (offset == InvalidOffsetNumber)
267         {
268                 ptr = GinDataPageGetItem(page, maxoff + 1);
269         }
270         else
271         {
272                 ptr = GinDataPageGetItem(page, offset);
273                 if (maxoff + 1 - offset != 0)
274                         memmove(ptr + GinSizeOfItem(page), ptr, (maxoff - offset + 1) * GinSizeOfItem(page));
275         }
276         memcpy(ptr, data, GinSizeOfItem(page));
277
278         GinPageGetOpaque(page)->maxoff++;
279 }
280
281 /*
282  * Deletes posting item from non-leaf page
283  */
284 void
285 PageDeletePostingItem(Page page, OffsetNumber offset)
286 {
287         OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
288
289         Assert(!GinPageIsLeaf(page));
290         Assert(offset >= FirstOffsetNumber && offset <= maxoff);
291
292         if (offset != maxoff)
293                 memmove(GinDataPageGetItem(page, offset), GinDataPageGetItem(page, offset + 1),
294                                 sizeof(PostingItem) * (maxoff - offset));
295
296         GinPageGetOpaque(page)->maxoff--;
297 }
298
299 /*
300  * checks space to install new value,
301  * item pointer never deletes!
302  */
303 static bool
304 dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
305 {
306         Page            page = BufferGetPage(buf);
307
308         Assert(GinPageIsData(page));
309         Assert(!btree->isDelete);
310
311         if (GinPageIsLeaf(page))
312         {
313                 if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
314                 {
315                         if ((btree->nitem - btree->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
316                                 return true;
317                 }
318                 else if (sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
319                         return true;
320         }
321         else if (sizeof(PostingItem) <= GinDataPageGetFreeSpace(page))
322                 return true;
323
324         return false;
325 }
326
327 /*
328  * In case of previous split update old child blkno to
329  * new right page
330  * item pointer never deletes!
331  */
332 static BlockNumber
333 dataPrepareData(GinBtree btree, Page page, OffsetNumber off)
334 {
335         BlockNumber ret = InvalidBlockNumber;
336
337         Assert(GinPageIsData(page));
338
339         if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
340         {
341                 PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, off);
342
343                 PostingItemSetBlockNumber(pitem, btree->rightblkno);
344                 ret = btree->rightblkno;
345         }
346
347         btree->rightblkno = InvalidBlockNumber;
348
349         return ret;
350 }
351
352 /*
353  * Places keys to page and fills WAL record. In case leaf page and
354  * build mode puts all ItemPointers to page.
355  */
356 static void
357 dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
358 {
359         Page            page = BufferGetPage(buf);
360         static XLogRecData rdata[3];
361         int                     sizeofitem = GinSizeOfItem(page);
362         static ginxlogInsert data;
363         int                     cnt = 0;
364
365         *prdata = rdata;
366         Assert(GinPageIsData(page));
367
368         data.updateBlkno = dataPrepareData(btree, page, off);
369
370         data.node = btree->index->rd_node;
371         data.blkno = BufferGetBlockNumber(buf);
372         data.offset = off;
373         data.nitem = 1;
374         data.isDelete = FALSE;
375         data.isData = TRUE;
376         data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
377
378         /*
379          * Prevent full page write if child's split occurs. That is needed to
380          * remove incomplete splits while replaying WAL
381          *
382          * data.updateBlkno contains new block number (of newly created right
383          * page) for recently splited page.
384          */
385         if (data.updateBlkno == InvalidBlockNumber)
386         {
387                 rdata[0].buffer = buf;
388                 rdata[0].buffer_std = FALSE;
389                 rdata[0].data = NULL;
390                 rdata[0].len = 0;
391                 rdata[0].next = &rdata[1];
392                 cnt++;
393         }
394
395         rdata[cnt].buffer = InvalidBuffer;
396         rdata[cnt].data = (char *) &data;
397         rdata[cnt].len = sizeof(ginxlogInsert);
398         rdata[cnt].next = &rdata[cnt + 1];
399         cnt++;
400
401         rdata[cnt].buffer = InvalidBuffer;
402         rdata[cnt].data = (GinPageIsLeaf(page)) ? ((char *) (btree->items + btree->curitem)) : ((char *) &(btree->pitem));
403         rdata[cnt].len = sizeofitem;
404         rdata[cnt].next = NULL;
405
406         if (GinPageIsLeaf(page))
407         {
408                 if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
409                 {
410                         /* usually, create index... */
411                         uint32          savedPos = btree->curitem;
412
413                         while (btree->curitem < btree->nitem)
414                         {
415                                 GinDataPageAddItem(page, btree->items + btree->curitem, off);
416                                 off++;
417                                 btree->curitem++;
418                         }
419                         data.nitem = btree->curitem - savedPos;
420                         rdata[cnt].len = sizeofitem * data.nitem;
421                 }
422                 else
423                 {
424                         GinDataPageAddItem(page, btree->items + btree->curitem, off);
425                         btree->curitem++;
426                 }
427         }
428         else
429                 GinDataPageAddItem(page, &(btree->pitem), off);
430 }
431
432 /*
433  * split page and fills WAL record. original buffer(lbuf) leaves untouched,
434  * returns shadow page of lbuf filled new data. In leaf page and build mode puts all
435  * ItemPointers to pages. Also, in build mode splits data by way to full fulled
436  * left page
437  */
438 static Page
439 dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
440 {
441         static ginxlogSplit data;
442         static XLogRecData rdata[4];
443         static char vector[2 * BLCKSZ];
444         char       *ptr;
445         OffsetNumber separator;
446         ItemPointer bound;
447         Page            lpage = GinPageGetCopyPage(BufferGetPage(lbuf));
448         ItemPointerData oldbound = *GinDataPageGetRightBound(lpage);
449         int                     sizeofitem = GinSizeOfItem(lpage);
450         OffsetNumber maxoff = GinPageGetOpaque(lpage)->maxoff;
451         Page            rpage = BufferGetPage(rbuf);
452         Size            pageSize = PageGetPageSize(lpage);
453         Size            freeSpace;
454         uint32          nCopied = 1;
455
456         GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
457         freeSpace = GinDataPageGetFreeSpace(rpage);
458
459         *prdata = rdata;
460         data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
461                 InvalidOffsetNumber : PostingItemGetBlockNumber(&(btree->pitem));
462         data.updateBlkno = dataPrepareData(btree, lpage, off);
463
464         memcpy(vector, GinDataPageGetItem(lpage, FirstOffsetNumber),
465                    maxoff * sizeofitem);
466
467         if (GinPageIsLeaf(lpage) && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
468         {
469                 nCopied = 0;
470                 while (btree->curitem < btree->nitem && maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData)))
471                 {
472                         memcpy(vector + maxoff * sizeof(ItemPointerData), btree->items + btree->curitem,
473                                    sizeof(ItemPointerData));
474                         maxoff++;
475                         nCopied++;
476                         btree->curitem++;
477                 }
478         }
479         else
480         {
481                 ptr = vector + (off - 1) * sizeofitem;
482                 if (maxoff + 1 - off != 0)
483                         memmove(ptr + sizeofitem, ptr, (maxoff - off + 1) * sizeofitem);
484                 if (GinPageIsLeaf(lpage))
485                 {
486                         memcpy(ptr, btree->items + btree->curitem, sizeofitem);
487                         btree->curitem++;
488                 }
489                 else
490                         memcpy(ptr, &(btree->pitem), sizeofitem);
491
492                 maxoff++;
493         }
494
495         /*
496          * we suppose that during index creation table scaned from begin to end,
497          * so ItemPointers are monotonically increased..
498          */
499         if (btree->isBuild && GinPageRightMost(lpage))
500                 separator = freeSpace / sizeofitem;
501         else
502                 separator = maxoff / 2;
503
504         GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
505         GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
506
507         memcpy(GinDataPageGetItem(lpage, FirstOffsetNumber), vector, separator * sizeofitem);
508         GinPageGetOpaque(lpage)->maxoff = separator;
509         memcpy(GinDataPageGetItem(rpage, FirstOffsetNumber),
510                  vector + separator * sizeofitem, (maxoff - separator) * sizeofitem);
511         GinPageGetOpaque(rpage)->maxoff = maxoff - separator;
512
513         PostingItemSetBlockNumber(&(btree->pitem), BufferGetBlockNumber(lbuf));
514         if (GinPageIsLeaf(lpage))
515                 btree->pitem.key = *(ItemPointerData *) GinDataPageGetItem(lpage,
516                                                                                         GinPageGetOpaque(lpage)->maxoff);
517         else
518                 btree->pitem.key = ((PostingItem *) GinDataPageGetItem(lpage,
519                                                                           GinPageGetOpaque(lpage)->maxoff))->key;
520         btree->rightblkno = BufferGetBlockNumber(rbuf);
521
522         /* set up right bound for left page */
523         bound = GinDataPageGetRightBound(lpage);
524         *bound = btree->pitem.key;
525
526         /* set up right bound for right page */
527         bound = GinDataPageGetRightBound(rpage);
528         *bound = oldbound;
529
530         data.node = btree->index->rd_node;
531         data.rootBlkno = InvalidBlockNumber;
532         data.lblkno = BufferGetBlockNumber(lbuf);
533         data.rblkno = BufferGetBlockNumber(rbuf);
534         data.separator = separator;
535         data.nitem = maxoff;
536         data.isData = TRUE;
537         data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
538         data.isRootSplit = FALSE;
539         data.rightbound = oldbound;
540
541         rdata[0].buffer = InvalidBuffer;
542         rdata[0].data = (char *) &data;
543         rdata[0].len = sizeof(ginxlogSplit);
544         rdata[0].next = &rdata[1];
545
546         rdata[1].buffer = InvalidBuffer;
547         rdata[1].data = vector;
548         rdata[1].len = MAXALIGN(maxoff * sizeofitem);
549         rdata[1].next = NULL;
550
551         return lpage;
552 }
553
554 /*
555  * Fills new root by right bound values from child.
556  * Also called from ginxlog, should not use btree
557  */
558 void
559 dataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
560 {
561         Page            page = BufferGetPage(root),
562                                 lpage = BufferGetPage(lbuf),
563                                 rpage = BufferGetPage(rbuf);
564         PostingItem li,
565                                 ri;
566
567         li.key = *GinDataPageGetRightBound(lpage);
568         PostingItemSetBlockNumber(&li, BufferGetBlockNumber(lbuf));
569         GinDataPageAddItem(page, &li, InvalidOffsetNumber);
570
571         ri.key = *GinDataPageGetRightBound(rpage);
572         PostingItemSetBlockNumber(&ri, BufferGetBlockNumber(rbuf));
573         GinDataPageAddItem(page, &ri, InvalidOffsetNumber);
574 }
575
576 void
577 prepareDataScan(GinBtree btree, Relation index)
578 {
579         memset(btree, 0, sizeof(GinBtreeData));
580         btree->index = index;
581         btree->isMoveRight = dataIsMoveRight;
582         btree->findChildPage = dataLocateItem;
583         btree->findItem = dataLocateLeafItem;
584         btree->findChildPtr = dataFindChildPtr;
585         btree->getLeftMostPage = dataGetLeftMostPage;
586         btree->isEnoughSpace = dataIsEnoughSpace;
587         btree->placeToPage = dataPlaceToPage;
588         btree->splitPage = dataSplitPage;
589         btree->fillRoot = dataFillRoot;
590
591         btree->searchMode = FALSE;
592         btree->isDelete = FALSE;
593         btree->fullScan = FALSE;
594         btree->isBuild = FALSE;
595 }
596
597 GinPostingTreeScan *
598 prepareScanPostingTree(Relation index, BlockNumber rootBlkno, bool searchMode)
599 {
600         GinPostingTreeScan *gdi = (GinPostingTreeScan *) palloc0(sizeof(GinPostingTreeScan));
601
602         prepareDataScan(&gdi->btree, index);
603
604         gdi->btree.searchMode = searchMode;
605         gdi->btree.fullScan = searchMode;
606
607         gdi->stack = ginPrepareFindLeafPage(&gdi->btree, rootBlkno);
608
609         return gdi;
610 }
611
612 /*
613  * Inserts array of item pointers, may execute several tree scan (very rare)
614  */
615 void
616 insertItemPointer(GinPostingTreeScan *gdi, ItemPointerData *items, uint32 nitem)
617 {
618         BlockNumber rootBlkno = gdi->stack->blkno;
619
620         gdi->btree.items = items;
621         gdi->btree.nitem = nitem;
622         gdi->btree.curitem = 0;
623
624         while (gdi->btree.curitem < gdi->btree.nitem)
625         {
626                 if (!gdi->stack)
627                         gdi->stack = ginPrepareFindLeafPage(&gdi->btree, rootBlkno);
628
629                 gdi->stack = ginFindLeafPage(&gdi->btree, gdi->stack);
630
631                 if (gdi->btree.findItem(&(gdi->btree), gdi->stack))
632                         elog(ERROR, "item pointer (%u,%d) already exists",
633                         ItemPointerGetBlockNumber(gdi->btree.items + gdi->btree.curitem),
634                                  ItemPointerGetOffsetNumber(gdi->btree.items + gdi->btree.curitem));
635
636                 ginInsertValue(&(gdi->btree), gdi->stack);
637
638                 gdi->stack = NULL;
639         }
640 }
641
642 Buffer
643 scanBeginPostingTree(GinPostingTreeScan *gdi)
644 {
645         gdi->stack = ginFindLeafPage(&gdi->btree, gdi->stack);
646         return gdi->stack->buffer;
647 }