]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/gindatapage.c
pgindent run before PG 9.1 beta 1.
[postgresql] / src / backend / access / gin / gindatapage.c
1 /*-------------------------------------------------------------------------
2  *
3  * gindatapage.c
4  *        page utilities routines for the postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/gin/gindatapage.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/gin_private.h"
18 #include "storage/bufmgr.h"
19 #include "utils/rel.h"
20
21 int
22 ginCompareItemPointers(ItemPointer a, ItemPointer b)
23 {
24         BlockNumber ba = GinItemPointerGetBlockNumber(a);
25         BlockNumber bb = GinItemPointerGetBlockNumber(b);
26
27         if (ba == bb)
28         {
29                 OffsetNumber oa = GinItemPointerGetOffsetNumber(a);
30                 OffsetNumber ob = GinItemPointerGetOffsetNumber(b);
31
32                 if (oa == ob)
33                         return 0;
34                 return (oa > ob) ? 1 : -1;
35         }
36
37         return (ba > bb) ? 1 : -1;
38 }
39
40 /*
41  * Merge two ordered arrays of itempointers, eliminating any duplicates.
42  * Returns the number of items in the result.
43  * Caller is responsible that there is enough space at *dst.
44  */
45 uint32
46 ginMergeItemPointers(ItemPointerData *dst,
47                                          ItemPointerData *a, uint32 na,
48                                          ItemPointerData *b, uint32 nb)
49 {
50         ItemPointerData *dptr = dst;
51         ItemPointerData *aptr = a,
52                            *bptr = b;
53
54         while (aptr - a < na && bptr - b < nb)
55         {
56                 int                     cmp = ginCompareItemPointers(aptr, bptr);
57
58                 if (cmp > 0)
59                         *dptr++ = *bptr++;
60                 else if (cmp == 0)
61                 {
62                         /* we want only one copy of the identical items */
63                         *dptr++ = *bptr++;
64                         aptr++;
65                 }
66                 else
67                         *dptr++ = *aptr++;
68         }
69
70         while (aptr - a < na)
71                 *dptr++ = *aptr++;
72
73         while (bptr - b < nb)
74                 *dptr++ = *bptr++;
75
76         return dptr - dst;
77 }
78
79 /*
80  * Checks, should we move to right link...
81  * Compares inserting itemp pointer with right bound of current page
82  */
83 static bool
84 dataIsMoveRight(GinBtree btree, Page page)
85 {
86         ItemPointer iptr = GinDataPageGetRightBound(page);
87
88         if (GinPageRightMost(page))
89                 return FALSE;
90
91         return (ginCompareItemPointers(btree->items + btree->curitem, iptr) > 0) ? TRUE : FALSE;
92 }
93
94 /*
95  * Find correct PostingItem in non-leaf page. It supposed that page
96  * correctly chosen and searching value SHOULD be on page
97  */
98 static BlockNumber
99 dataLocateItem(GinBtree btree, GinBtreeStack *stack)
100 {
101         OffsetNumber low,
102                                 high,
103                                 maxoff;
104         PostingItem *pitem = NULL;
105         int                     result;
106         Page            page = BufferGetPage(stack->buffer);
107
108         Assert(!GinPageIsLeaf(page));
109         Assert(GinPageIsData(page));
110
111         if (btree->fullScan)
112         {
113                 stack->off = FirstOffsetNumber;
114                 stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
115                 return btree->getLeftMostPage(btree, page);
116         }
117
118         low = FirstOffsetNumber;
119         maxoff = high = GinPageGetOpaque(page)->maxoff;
120         Assert(high >= low);
121
122         high++;
123
124         while (high > low)
125         {
126                 OffsetNumber mid = low + ((high - low) / 2);
127
128                 pitem = (PostingItem *) GinDataPageGetItem(page, mid);
129
130                 if (mid == maxoff)
131                 {
132                         /*
133                          * Right infinity, page already correctly chosen with a help of
134                          * dataIsMoveRight
135                          */
136                         result = -1;
137                 }
138                 else
139                 {
140                         pitem = (PostingItem *) GinDataPageGetItem(page, mid);
141                         result = ginCompareItemPointers(btree->items + btree->curitem, &(pitem->key));
142                 }
143
144                 if (result == 0)
145                 {
146                         stack->off = mid;
147                         return PostingItemGetBlockNumber(pitem);
148                 }
149                 else if (result > 0)
150                         low = mid + 1;
151                 else
152                         high = mid;
153         }
154
155         Assert(high >= FirstOffsetNumber && high <= maxoff);
156
157         stack->off = high;
158         pitem = (PostingItem *) GinDataPageGetItem(page, high);
159         return PostingItemGetBlockNumber(pitem);
160 }
161
162 /*
163  * Searches correct position for value on leaf page.
164  * Page should be correctly chosen.
165  * Returns true if value found on page.
166  */
167 static bool
168 dataLocateLeafItem(GinBtree btree, GinBtreeStack *stack)
169 {
170         Page            page = BufferGetPage(stack->buffer);
171         OffsetNumber low,
172                                 high;
173         int                     result;
174
175         Assert(GinPageIsLeaf(page));
176         Assert(GinPageIsData(page));
177
178         if (btree->fullScan)
179         {
180                 stack->off = FirstOffsetNumber;
181                 return TRUE;
182         }
183
184         low = FirstOffsetNumber;
185         high = GinPageGetOpaque(page)->maxoff;
186
187         if (high < low)
188         {
189                 stack->off = FirstOffsetNumber;
190                 return false;
191         }
192
193         high++;
194
195         while (high > low)
196         {
197                 OffsetNumber mid = low + ((high - low) / 2);
198
199                 result = ginCompareItemPointers(btree->items + btree->curitem, (ItemPointer) GinDataPageGetItem(page, mid));
200
201                 if (result == 0)
202                 {
203                         stack->off = mid;
204                         return true;
205                 }
206                 else if (result > 0)
207                         low = mid + 1;
208                 else
209                         high = mid;
210         }
211
212         stack->off = high;
213         return false;
214 }
215
216 /*
217  * Finds links to blkno on non-leaf page, returns
218  * offset of PostingItem
219  */
220 static OffsetNumber
221 dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
222 {
223         OffsetNumber i,
224                                 maxoff = GinPageGetOpaque(page)->maxoff;
225         PostingItem *pitem;
226
227         Assert(!GinPageIsLeaf(page));
228         Assert(GinPageIsData(page));
229
230         /* if page isn't changed, we return storedOff */
231         if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
232         {
233                 pitem = (PostingItem *) GinDataPageGetItem(page, storedOff);
234                 if (PostingItemGetBlockNumber(pitem) == blkno)
235                         return storedOff;
236
237                 /*
238                  * we hope, that needed pointer goes to right. It's true if there
239                  * wasn't a deletion
240                  */
241                 for (i = storedOff + 1; i <= maxoff; i++)
242                 {
243                         pitem = (PostingItem *) GinDataPageGetItem(page, i);
244                         if (PostingItemGetBlockNumber(pitem) == blkno)
245                                 return i;
246                 }
247
248                 maxoff = storedOff - 1;
249         }
250
251         /* last chance */
252         for (i = FirstOffsetNumber; i <= maxoff; i++)
253         {
254                 pitem = (PostingItem *) GinDataPageGetItem(page, i);
255                 if (PostingItemGetBlockNumber(pitem) == blkno)
256                         return i;
257         }
258
259         return InvalidOffsetNumber;
260 }
261
262 /*
263  * returns blkno of leftmost child
264  */
265 static BlockNumber
266 dataGetLeftMostPage(GinBtree btree, Page page)
267 {
268         PostingItem *pitem;
269
270         Assert(!GinPageIsLeaf(page));
271         Assert(GinPageIsData(page));
272         Assert(GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber);
273
274         pitem = (PostingItem *) GinDataPageGetItem(page, FirstOffsetNumber);
275         return PostingItemGetBlockNumber(pitem);
276 }
277
278 /*
279  * add ItemPointer or PostingItem to page. data should point to
280  * correct value! depending on leaf or non-leaf page
281  */
282 void
283 GinDataPageAddItem(Page page, void *data, OffsetNumber offset)
284 {
285         OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
286         char       *ptr;
287
288         if (offset == InvalidOffsetNumber)
289         {
290                 ptr = GinDataPageGetItem(page, maxoff + 1);
291         }
292         else
293         {
294                 ptr = GinDataPageGetItem(page, offset);
295                 if (maxoff + 1 - offset != 0)
296                         memmove(ptr + GinSizeOfDataPageItem(page),
297                                         ptr,
298                                         (maxoff - offset + 1) * GinSizeOfDataPageItem(page));
299         }
300         memcpy(ptr, data, GinSizeOfDataPageItem(page));
301
302         GinPageGetOpaque(page)->maxoff++;
303 }
304
305 /*
306  * Deletes posting item from non-leaf page
307  */
308 void
309 GinPageDeletePostingItem(Page page, OffsetNumber offset)
310 {
311         OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
312
313         Assert(!GinPageIsLeaf(page));
314         Assert(offset >= FirstOffsetNumber && offset <= maxoff);
315
316         if (offset != maxoff)
317                 memmove(GinDataPageGetItem(page, offset), GinDataPageGetItem(page, offset + 1),
318                                 sizeof(PostingItem) * (maxoff - offset));
319
320         GinPageGetOpaque(page)->maxoff--;
321 }
322
323 /*
324  * checks space to install new value,
325  * item pointer never deletes!
326  */
327 static bool
328 dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
329 {
330         Page            page = BufferGetPage(buf);
331
332         Assert(GinPageIsData(page));
333         Assert(!btree->isDelete);
334
335         if (GinPageIsLeaf(page))
336         {
337                 if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
338                 {
339                         if ((btree->nitem - btree->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
340                                 return true;
341                 }
342                 else if (sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
343                         return true;
344         }
345         else if (sizeof(PostingItem) <= GinDataPageGetFreeSpace(page))
346                 return true;
347
348         return false;
349 }
350
351 /*
352  * In case of previous split update old child blkno to
353  * new right page
354  * item pointer never deletes!
355  */
356 static BlockNumber
357 dataPrepareData(GinBtree btree, Page page, OffsetNumber off)
358 {
359         BlockNumber ret = InvalidBlockNumber;
360
361         Assert(GinPageIsData(page));
362
363         if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
364         {
365                 PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, off);
366
367                 PostingItemSetBlockNumber(pitem, btree->rightblkno);
368                 ret = btree->rightblkno;
369         }
370
371         btree->rightblkno = InvalidBlockNumber;
372
373         return ret;
374 }
375
376 /*
377  * Places keys to page and fills WAL record. In case leaf page and
378  * build mode puts all ItemPointers to page.
379  */
380 static void
381 dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
382 {
383         Page            page = BufferGetPage(buf);
384         int                     sizeofitem = GinSizeOfDataPageItem(page);
385         int                     cnt = 0;
386
387         /* these must be static so they can be returned to caller */
388         static XLogRecData rdata[3];
389         static ginxlogInsert data;
390
391         *prdata = rdata;
392         Assert(GinPageIsData(page));
393
394         data.updateBlkno = dataPrepareData(btree, page, off);
395
396         data.node = btree->index->rd_node;
397         data.blkno = BufferGetBlockNumber(buf);
398         data.offset = off;
399         data.nitem = 1;
400         data.isDelete = FALSE;
401         data.isData = TRUE;
402         data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
403
404         /*
405          * Prevent full page write if child's split occurs. That is needed to
406          * remove incomplete splits while replaying WAL
407          *
408          * data.updateBlkno contains new block number (of newly created right
409          * page) for recently splited page.
410          */
411         if (data.updateBlkno == InvalidBlockNumber)
412         {
413                 rdata[0].buffer = buf;
414                 rdata[0].buffer_std = FALSE;
415                 rdata[0].data = NULL;
416                 rdata[0].len = 0;
417                 rdata[0].next = &rdata[1];
418                 cnt++;
419         }
420
421         rdata[cnt].buffer = InvalidBuffer;
422         rdata[cnt].data = (char *) &data;
423         rdata[cnt].len = sizeof(ginxlogInsert);
424         rdata[cnt].next = &rdata[cnt + 1];
425         cnt++;
426
427         rdata[cnt].buffer = InvalidBuffer;
428         rdata[cnt].data = (GinPageIsLeaf(page)) ? ((char *) (btree->items + btree->curitem)) : ((char *) &(btree->pitem));
429         rdata[cnt].len = sizeofitem;
430         rdata[cnt].next = NULL;
431
432         if (GinPageIsLeaf(page))
433         {
434                 if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
435                 {
436                         /* usually, create index... */
437                         uint32          savedPos = btree->curitem;
438
439                         while (btree->curitem < btree->nitem)
440                         {
441                                 GinDataPageAddItem(page, btree->items + btree->curitem, off);
442                                 off++;
443                                 btree->curitem++;
444                         }
445                         data.nitem = btree->curitem - savedPos;
446                         rdata[cnt].len = sizeofitem * data.nitem;
447                 }
448                 else
449                 {
450                         GinDataPageAddItem(page, btree->items + btree->curitem, off);
451                         btree->curitem++;
452                 }
453         }
454         else
455                 GinDataPageAddItem(page, &(btree->pitem), off);
456 }
457
458 /*
459  * split page and fills WAL record. original buffer(lbuf) leaves untouched,
460  * returns shadow page of lbuf filled new data. In leaf page and build mode puts all
461  * ItemPointers to pages. Also, in build mode splits data by way to full fulled
462  * left page
463  */
464 static Page
465 dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
466 {
467         char       *ptr;
468         OffsetNumber separator;
469         ItemPointer bound;
470         Page            lpage = PageGetTempPageCopy(BufferGetPage(lbuf));
471         ItemPointerData oldbound = *GinDataPageGetRightBound(lpage);
472         int                     sizeofitem = GinSizeOfDataPageItem(lpage);
473         OffsetNumber maxoff = GinPageGetOpaque(lpage)->maxoff;
474         Page            rpage = BufferGetPage(rbuf);
475         Size            pageSize = PageGetPageSize(lpage);
476         Size            freeSpace;
477         uint32          nCopied = 1;
478
479         /* these must be static so they can be returned to caller */
480         static ginxlogSplit data;
481         static XLogRecData rdata[4];
482         static char vector[2 * BLCKSZ];
483
484         GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
485         freeSpace = GinDataPageGetFreeSpace(rpage);
486
487         *prdata = rdata;
488         data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
489                 InvalidOffsetNumber : PostingItemGetBlockNumber(&(btree->pitem));
490         data.updateBlkno = dataPrepareData(btree, lpage, off);
491
492         memcpy(vector, GinDataPageGetItem(lpage, FirstOffsetNumber),
493                    maxoff * sizeofitem);
494
495         if (GinPageIsLeaf(lpage) && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
496         {
497                 nCopied = 0;
498                 while (btree->curitem < btree->nitem &&
499                            maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData)))
500                 {
501                         memcpy(vector + maxoff * sizeof(ItemPointerData),
502                                    btree->items + btree->curitem,
503                                    sizeof(ItemPointerData));
504                         maxoff++;
505                         nCopied++;
506                         btree->curitem++;
507                 }
508         }
509         else
510         {
511                 ptr = vector + (off - 1) * sizeofitem;
512                 if (maxoff + 1 - off != 0)
513                         memmove(ptr + sizeofitem, ptr, (maxoff - off + 1) * sizeofitem);
514                 if (GinPageIsLeaf(lpage))
515                 {
516                         memcpy(ptr, btree->items + btree->curitem, sizeofitem);
517                         btree->curitem++;
518                 }
519                 else
520                         memcpy(ptr, &(btree->pitem), sizeofitem);
521
522                 maxoff++;
523         }
524
525         /*
526          * we suppose that during index creation table scaned from begin to end,
527          * so ItemPointers are monotonically increased..
528          */
529         if (btree->isBuild && GinPageRightMost(lpage))
530                 separator = freeSpace / sizeofitem;
531         else
532                 separator = maxoff / 2;
533
534         GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
535         GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
536
537         memcpy(GinDataPageGetItem(lpage, FirstOffsetNumber), vector, separator * sizeofitem);
538         GinPageGetOpaque(lpage)->maxoff = separator;
539         memcpy(GinDataPageGetItem(rpage, FirstOffsetNumber),
540                  vector + separator * sizeofitem, (maxoff - separator) * sizeofitem);
541         GinPageGetOpaque(rpage)->maxoff = maxoff - separator;
542
543         PostingItemSetBlockNumber(&(btree->pitem), BufferGetBlockNumber(lbuf));
544         if (GinPageIsLeaf(lpage))
545                 btree->pitem.key = *(ItemPointerData *) GinDataPageGetItem(lpage,
546                                                                                         GinPageGetOpaque(lpage)->maxoff);
547         else
548                 btree->pitem.key = ((PostingItem *) GinDataPageGetItem(lpage,
549                                                                           GinPageGetOpaque(lpage)->maxoff))->key;
550         btree->rightblkno = BufferGetBlockNumber(rbuf);
551
552         /* set up right bound for left page */
553         bound = GinDataPageGetRightBound(lpage);
554         *bound = btree->pitem.key;
555
556         /* set up right bound for right page */
557         bound = GinDataPageGetRightBound(rpage);
558         *bound = oldbound;
559
560         data.node = btree->index->rd_node;
561         data.rootBlkno = InvalidBlockNumber;
562         data.lblkno = BufferGetBlockNumber(lbuf);
563         data.rblkno = BufferGetBlockNumber(rbuf);
564         data.separator = separator;
565         data.nitem = maxoff;
566         data.isData = TRUE;
567         data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
568         data.isRootSplit = FALSE;
569         data.rightbound = oldbound;
570
571         rdata[0].buffer = InvalidBuffer;
572         rdata[0].data = (char *) &data;
573         rdata[0].len = sizeof(ginxlogSplit);
574         rdata[0].next = &rdata[1];
575
576         rdata[1].buffer = InvalidBuffer;
577         rdata[1].data = vector;
578         rdata[1].len = MAXALIGN(maxoff * sizeofitem);
579         rdata[1].next = NULL;
580
581         return lpage;
582 }
583
584 /*
585  * Fills new root by right bound values from child.
586  * Also called from ginxlog, should not use btree
587  */
588 void
589 ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
590 {
591         Page            page = BufferGetPage(root),
592                                 lpage = BufferGetPage(lbuf),
593                                 rpage = BufferGetPage(rbuf);
594         PostingItem li,
595                                 ri;
596
597         li.key = *GinDataPageGetRightBound(lpage);
598         PostingItemSetBlockNumber(&li, BufferGetBlockNumber(lbuf));
599         GinDataPageAddItem(page, &li, InvalidOffsetNumber);
600
601         ri.key = *GinDataPageGetRightBound(rpage);
602         PostingItemSetBlockNumber(&ri, BufferGetBlockNumber(rbuf));
603         GinDataPageAddItem(page, &ri, InvalidOffsetNumber);
604 }
605
606 void
607 ginPrepareDataScan(GinBtree btree, Relation index)
608 {
609         memset(btree, 0, sizeof(GinBtreeData));
610
611         btree->index = index;
612
613         btree->findChildPage = dataLocateItem;
614         btree->isMoveRight = dataIsMoveRight;
615         btree->findItem = dataLocateLeafItem;
616         btree->findChildPtr = dataFindChildPtr;
617         btree->getLeftMostPage = dataGetLeftMostPage;
618         btree->isEnoughSpace = dataIsEnoughSpace;
619         btree->placeToPage = dataPlaceToPage;
620         btree->splitPage = dataSplitPage;
621         btree->fillRoot = ginDataFillRoot;
622
623         btree->isData = TRUE;
624         btree->searchMode = FALSE;
625         btree->isDelete = FALSE;
626         btree->fullScan = FALSE;
627         btree->isBuild = FALSE;
628 }
629
630 GinPostingTreeScan *
631 ginPrepareScanPostingTree(Relation index, BlockNumber rootBlkno, bool searchMode)
632 {
633         GinPostingTreeScan *gdi = (GinPostingTreeScan *) palloc0(sizeof(GinPostingTreeScan));
634
635         ginPrepareDataScan(&gdi->btree, index);
636
637         gdi->btree.searchMode = searchMode;
638         gdi->btree.fullScan = searchMode;
639
640         gdi->stack = ginPrepareFindLeafPage(&gdi->btree, rootBlkno);
641
642         return gdi;
643 }
644
645 /*
646  * Inserts array of item pointers, may execute several tree scan (very rare)
647  */
648 void
649 ginInsertItemPointers(GinPostingTreeScan *gdi,
650                                           ItemPointerData *items, uint32 nitem,
651                                           GinStatsData *buildStats)
652 {
653         BlockNumber rootBlkno = gdi->stack->blkno;
654
655         gdi->btree.items = items;
656         gdi->btree.nitem = nitem;
657         gdi->btree.curitem = 0;
658
659         while (gdi->btree.curitem < gdi->btree.nitem)
660         {
661                 if (!gdi->stack)
662                         gdi->stack = ginPrepareFindLeafPage(&gdi->btree, rootBlkno);
663
664                 gdi->stack = ginFindLeafPage(&gdi->btree, gdi->stack);
665
666                 if (gdi->btree.findItem(&(gdi->btree), gdi->stack))
667                 {
668                         /*
669                          * gdi->btree.items[gdi->btree.curitem] already exists in index
670                          */
671                         gdi->btree.curitem++;
672                         LockBuffer(gdi->stack->buffer, GIN_UNLOCK);
673                         freeGinBtreeStack(gdi->stack);
674                 }
675                 else
676                         ginInsertValue(&(gdi->btree), gdi->stack, buildStats);
677
678                 gdi->stack = NULL;
679         }
680 }
681
682 Buffer
683 ginScanBeginPostingTree(GinPostingTreeScan *gdi)
684 {
685         gdi->stack = ginFindLeafPage(&gdi->btree, gdi->stack);
686         return gdi->stack->buffer;
687 }