]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/ginentrypage.c
HOT updates. When we update a tuple without changing any of its indexed
[postgresql] / src / backend / access / gin / ginentrypage.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginentrypage.c
4  *        page utilities routines for the postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      $PostgreSQL: pgsql/src/backend/access/gin/ginentrypage.c,v 1.9 2007/09/20 17:56:30 tgl Exp $
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16 #include "access/gin.h"
17 #include "access/tuptoaster.h"
18
19 /*
20  * forms tuple for entry tree. On leaf page, Index tuple has
21  * non-traditional layout. Tuple may contain posting list or
22  * root blocknumber of posting tree. Macros GinIsPostingTre: (itup) / GinSetPostingTree(itup, blkno)
23  * 1) Posting list
24  *              - itup->t_info & INDEX_SIZE_MASK contains size of tuple as usual
25  *              - ItemPointerGetBlockNumber(&itup->t_tid) contains original
26  *                size of tuple (without posting list).
27  *                Macroses: GinGetOrigSizePosting(itup) / GinSetOrigSizePosting(itup,n)
28  *              - ItemPointerGetOffsetNumber(&itup->t_tid) contains number
29  *                of elements in posting list (number of heap itempointer)
30  *                Macroses: GinGetNPosting(itup) / GinSetNPosting(itup,n)
31  *              - After usual part of tuple there is a posting list
32  *                Macros: GinGetPosting(itup)
33  * 2) Posting tree
34  *              - itup->t_info & INDEX_SIZE_MASK contains size of tuple as usual
35  *              - ItemPointerGetBlockNumber(&itup->t_tid) contains block number of
36  *                root of posting tree
37  *              - ItemPointerGetOffsetNumber(&itup->t_tid) contains magic number GIN_TREE_POSTING
38  */
39 IndexTuple
40 GinFormTuple(GinState *ginstate, Datum key, ItemPointerData *ipd, uint32 nipd)
41 {
42         bool            isnull = FALSE;
43         IndexTuple      itup;
44
45         itup = index_form_tuple(ginstate->tupdesc, &key, &isnull);
46
47         GinSetOrigSizePosting(itup, IndexTupleSize(itup));
48
49         if (nipd > 0)
50         {
51                 uint32          newsize = MAXALIGN(SHORTALIGN(IndexTupleSize(itup)) + sizeof(ItemPointerData) * nipd);
52
53                 if (newsize >= INDEX_SIZE_MASK)
54                         return NULL;
55
56                 if (newsize > TOAST_INDEX_TARGET && nipd > 1)
57                         return NULL;
58
59                 itup = repalloc(itup, newsize);
60
61                 /* set new size */
62                 itup->t_info &= ~INDEX_SIZE_MASK;
63                 itup->t_info |= newsize;
64
65                 if (ipd)
66                         memcpy(GinGetPosting(itup), ipd, sizeof(ItemPointerData) * nipd);
67                 GinSetNPosting(itup, nipd);
68         }
69         else
70         {
71                 GinSetNPosting(itup, 0);
72         }
73         return itup;
74 }
75
76 /*
77  * Entry tree is a "static", ie tuple never deletes from it,
78  * so we don't use right bound, we use rightest key instead.
79  */
80 static IndexTuple
81 getRightMostTuple(Page page)
82 {
83         OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
84
85         return (IndexTuple) PageGetItem(page, PageGetItemId(page, maxoff));
86 }
87
88 Datum
89 ginGetHighKey(GinState *ginstate, Page page)
90 {
91         IndexTuple      itup;
92         bool            isnull;
93
94         itup = getRightMostTuple(page);
95
96         return index_getattr(itup, FirstOffsetNumber, ginstate->tupdesc, &isnull);
97 }
98
99 static bool
100 entryIsMoveRight(GinBtree btree, Page page)
101 {
102         Datum           highkey;
103
104         if (GinPageRightMost(page))
105                 return FALSE;
106
107         highkey = ginGetHighKey(btree->ginstate, page);
108
109         if (compareEntries(btree->ginstate, btree->entryValue, highkey) > 0)
110                 return TRUE;
111
112         return FALSE;
113 }
114
115 /*
116  * Find correct tuple in non-leaf page. It supposed that
117  * page correctly choosen and searching value SHOULD be on page
118  */
119 static BlockNumber
120 entryLocateEntry(GinBtree btree, GinBtreeStack *stack)
121 {
122         OffsetNumber low,
123                                 high,
124                                 maxoff;
125         IndexTuple      itup = NULL;
126         int                     result;
127         Page            page = BufferGetPage(stack->buffer);
128
129         Assert(!GinPageIsLeaf(page));
130         Assert(!GinPageIsData(page));
131
132         if (btree->fullScan)
133         {
134                 stack->off = FirstOffsetNumber;
135                 stack->predictNumber *= PageGetMaxOffsetNumber(page);
136                 return btree->getLeftMostPage(btree, page);
137         }
138
139         low = FirstOffsetNumber;
140         maxoff = high = PageGetMaxOffsetNumber(page);
141         Assert(high >= low);
142
143         high++;
144
145         while (high > low)
146         {
147                 OffsetNumber mid = low + ((high - low) / 2);
148
149                 if (mid == maxoff && GinPageRightMost(page))
150                         /* Right infinity */
151                         result = -1;
152                 else
153                 {
154                         bool            isnull;
155
156                         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
157                         result = compareEntries(btree->ginstate, btree->entryValue,
158                                                                         index_getattr(itup, FirstOffsetNumber, btree->ginstate->tupdesc, &isnull));
159                 }
160
161                 if (result == 0)
162                 {
163                         stack->off = mid;
164                         Assert(GinItemPointerGetBlockNumber(&(itup)->t_tid) != GIN_ROOT_BLKNO);
165                         return GinItemPointerGetBlockNumber(&(itup)->t_tid);
166                 }
167                 else if (result > 0)
168                         low = mid + 1;
169                 else
170                         high = mid;
171         }
172
173         Assert(high >= FirstOffsetNumber && high <= maxoff);
174
175         stack->off = high;
176         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, high));
177         Assert(GinItemPointerGetBlockNumber(&(itup)->t_tid) != GIN_ROOT_BLKNO);
178         return GinItemPointerGetBlockNumber(&(itup)->t_tid);
179 }
180
181 /*
182  * Searches correct position for value on leaf page.
183  * Page should be corrrectly choosen.
184  * Returns true if value found on page.
185  */
186 static bool
187 entryLocateLeafEntry(GinBtree btree, GinBtreeStack *stack)
188 {
189         Page            page = BufferGetPage(stack->buffer);
190         OffsetNumber low,
191                                 high;
192         IndexTuple      itup;
193
194         Assert(GinPageIsLeaf(page));
195         Assert(!GinPageIsData(page));
196
197         if (btree->fullScan)
198         {
199                 stack->off = FirstOffsetNumber;
200                 return TRUE;
201         }
202
203         low = FirstOffsetNumber;
204         high = PageGetMaxOffsetNumber(page);
205
206         if (high < low)
207         {
208                 stack->off = FirstOffsetNumber;
209                 return false;
210         }
211
212         high++;
213
214         while (high > low)
215         {
216                 OffsetNumber mid = low + ((high - low) / 2);
217                 bool            isnull;
218                 int                     result;
219
220                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
221                 result = compareEntries(btree->ginstate, btree->entryValue,
222                                                                 index_getattr(itup, FirstOffsetNumber, btree->ginstate->tupdesc, &isnull));
223
224                 if (result == 0)
225                 {
226                         stack->off = mid;
227                         return true;
228                 }
229                 else if (result > 0)
230                         low = mid + 1;
231                 else
232                         high = mid;
233         }
234
235         stack->off = high;
236         return false;
237 }
238
239 static OffsetNumber
240 entryFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
241 {
242         OffsetNumber i,
243                                 maxoff = PageGetMaxOffsetNumber(page);
244         IndexTuple      itup;
245
246         Assert(!GinPageIsLeaf(page));
247         Assert(!GinPageIsData(page));
248
249         /* if page isn't changed, we returns storedOff */
250         if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
251         {
252                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, storedOff));
253                 if (GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno)
254                         return storedOff;
255
256                 /*
257                  * we hope, that needed pointer goes to right. It's true if there
258                  * wasn't a deletion
259                  */
260                 for (i = storedOff + 1; i <= maxoff; i++)
261                 {
262                         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
263                         if (GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno)
264                                 return i;
265                 }
266                 maxoff = storedOff - 1;
267         }
268
269         /* last chance */
270         for (i = FirstOffsetNumber; i <= maxoff; i++)
271         {
272                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
273                 if (GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno)
274                         return i;
275         }
276
277         return InvalidOffsetNumber;
278 }
279
280 static BlockNumber
281 entryGetLeftMostPage(GinBtree btree, Page page)
282 {
283         IndexTuple      itup;
284
285         Assert(!GinPageIsLeaf(page));
286         Assert(!GinPageIsData(page));
287         Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
288
289         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
290         return GinItemPointerGetBlockNumber(&(itup)->t_tid);
291 }
292
293 static bool
294 entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
295 {
296         Size            itupsz = 0;
297         Page            page = BufferGetPage(buf);
298
299         Assert(btree->entry);
300         Assert(!GinPageIsData(page));
301
302         if (btree->isDelete)
303         {
304                 IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
305
306                 itupsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
307         }
308
309         if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData))
310                 return true;
311
312         return false;
313 }
314
315 /*
316  * Delete tuple on leaf page if tuples was existed and we
317  * should update it, update old child blkno to new right page
318  * if child split is occured
319  */
320 static BlockNumber
321 entryPreparePage(GinBtree btree, Page page, OffsetNumber off)
322 {
323         BlockNumber ret = InvalidBlockNumber;
324
325         Assert(btree->entry);
326         Assert(!GinPageIsData(page));
327
328         if (btree->isDelete)
329         {
330                 Assert(GinPageIsLeaf(page));
331                 PageIndexTupleDelete(page, off);
332         }
333
334         if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
335         {
336                 IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
337
338                 ItemPointerSet(&itup->t_tid, btree->rightblkno, InvalidOffsetNumber);
339                 ret = btree->rightblkno;
340         }
341
342         btree->rightblkno = InvalidBlockNumber;
343
344         return ret;
345 }
346
347 /*
348  * Place tuple on page and fills WAL record
349  */
350 static void
351 entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
352 {
353         Page            page = BufferGetPage(buf);
354         static XLogRecData rdata[3];
355         OffsetNumber placed;
356         static ginxlogInsert data;
357         int     cnt=0;
358
359         *prdata = rdata;
360         data.updateBlkno = entryPreparePage(btree, page, off);
361
362         placed = PageAddItem(page, (Item) btree->entry, IndexTupleSize(btree->entry), off, false, false);
363         if (placed != off)
364                 elog(ERROR, "failed to add item to index page in \"%s\"",
365                          RelationGetRelationName(btree->index));
366
367         data.node = btree->index->rd_node;
368         data.blkno = BufferGetBlockNumber(buf);
369         data.offset = off;
370         data.nitem = 1;
371         data.isDelete = btree->isDelete;
372         data.isData = false;
373         data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
374
375     /*
376          * Prevent full page write if child's split occurs. That is needed
377          * to remove incomplete splits while replaying WAL
378          *
379          * data.updateBlkno contains new block number (of newly created right page)
380          * for recently splited page.
381          */
382         if ( data.updateBlkno == InvalidBlockNumber ) 
383         {
384                 rdata[0].buffer = buf;
385                 rdata[0].buffer_std = TRUE;
386                 rdata[0].data = NULL;
387                 rdata[0].len = 0;
388                 rdata[0].next = &rdata[1];
389                 cnt++;
390         }
391
392         rdata[cnt].buffer = InvalidBuffer;
393         rdata[cnt].data = (char *) &data;
394         rdata[cnt].len = sizeof(ginxlogInsert);
395         rdata[cnt].next = &rdata[cnt+1];
396         cnt++;
397
398         rdata[cnt].buffer = InvalidBuffer;
399         rdata[cnt].data = (char *) btree->entry;
400         rdata[cnt].len = IndexTupleSize(btree->entry);
401         rdata[cnt].next = NULL;
402
403         btree->entry = NULL;
404 }
405
406 /*
407  * Place tuple and split page, original buffer(lbuf) leaves untouched,
408  * returns shadow page of lbuf filled new data.
409  * Tuples are distributed between pages by equal size on its, not
410  * an equal number!
411  */
412 static Page
413 entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
414 {
415         static XLogRecData rdata[2];
416         OffsetNumber i,
417                                 maxoff,
418                                 separator = InvalidOffsetNumber;
419         Size            totalsize = 0;
420         Size            lsize = 0,
421                                 size;
422         static char tupstore[2 * BLCKSZ];
423         char       *ptr;
424         IndexTuple      itup,
425                                 leftrightmost = NULL;
426         static ginxlogSplit data;
427         Datum           value;
428         bool            isnull;
429         Page            page;
430         Page            lpage = GinPageGetCopyPage(BufferGetPage(lbuf));
431         Page            rpage = BufferGetPage(rbuf);
432         Size            pageSize = PageGetPageSize(lpage);
433
434         *prdata = rdata;
435         data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
436                 InvalidOffsetNumber : GinItemPointerGetBlockNumber(&(btree->entry->t_tid));
437         data.updateBlkno = entryPreparePage(btree, lpage, off);
438
439         maxoff = PageGetMaxOffsetNumber(lpage);
440         ptr = tupstore;
441
442         for (i = FirstOffsetNumber; i <= maxoff; i++)
443         {
444                 if (i == off)
445                 {
446                         size = MAXALIGN(IndexTupleSize(btree->entry));
447                         memcpy(ptr, btree->entry, size);
448                         ptr += size;
449                         totalsize += size + sizeof(ItemIdData);
450                 }
451
452                 itup = (IndexTuple) PageGetItem(lpage, PageGetItemId(lpage, i));
453                 size = MAXALIGN(IndexTupleSize(itup));
454                 memcpy(ptr, itup, size);
455                 ptr += size;
456                 totalsize += size + sizeof(ItemIdData);
457         }
458
459         if (off == maxoff + 1)
460         {
461                 size = MAXALIGN(IndexTupleSize(btree->entry));
462                 memcpy(ptr, btree->entry, size);
463                 ptr += size;
464                 totalsize += size + sizeof(ItemIdData);
465         }
466
467         GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
468         GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
469
470         ptr = tupstore;
471         maxoff++;
472         lsize = 0;
473
474         page = lpage;
475         for (i = FirstOffsetNumber; i <= maxoff; i++)
476         {
477                 itup = (IndexTuple) ptr;
478
479                 if (lsize > totalsize / 2)
480                 {
481                         if (separator == InvalidOffsetNumber)
482                                 separator = i - 1;
483                         page = rpage;
484                 }
485                 else
486                 {
487                         leftrightmost = itup;
488                         lsize += MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
489                 }
490
491                 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
492                         elog(ERROR, "failed to add item to index page in \"%s\"",
493                                  RelationGetRelationName(btree->index));
494                 ptr += MAXALIGN(IndexTupleSize(itup));
495         }
496
497         value = index_getattr(leftrightmost, FirstOffsetNumber, btree->ginstate->tupdesc, &isnull);
498         btree->entry = GinFormTuple(btree->ginstate, value, NULL, 0);
499         ItemPointerSet(&(btree->entry)->t_tid, BufferGetBlockNumber(lbuf), InvalidOffsetNumber);
500         btree->rightblkno = BufferGetBlockNumber(rbuf);
501
502         data.node = btree->index->rd_node;
503         data.rootBlkno = InvalidBlockNumber;
504         data.lblkno = BufferGetBlockNumber(lbuf);
505         data.rblkno = BufferGetBlockNumber(rbuf);
506         data.separator = separator;
507         data.nitem = maxoff;
508         data.isData = FALSE;
509         data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
510         data.isRootSplit = FALSE;
511
512         rdata[0].buffer = InvalidBuffer;
513         rdata[0].data = (char *) &data;
514         rdata[0].len = sizeof(ginxlogSplit);
515         rdata[0].next = &rdata[1];
516
517         rdata[1].buffer = InvalidBuffer;
518         rdata[1].data = tupstore;
519         rdata[1].len = MAXALIGN(totalsize);
520         rdata[1].next = NULL;
521
522         return lpage;
523 }
524
525 /*
526  * return newly allocate rightmost tuple
527  */
528 IndexTuple
529 ginPageGetLinkItup(Buffer buf)
530 {
531         IndexTuple      itup,
532                                 nitup;
533         Page            page = BufferGetPage(buf);
534
535         itup = getRightMostTuple(page);
536         if (GinPageIsLeaf(page) && !GinIsPostingTree(itup))
537         {
538                 nitup = (IndexTuple) palloc(MAXALIGN(GinGetOrigSizePosting(itup)));
539                 memcpy(nitup, itup, GinGetOrigSizePosting(itup));
540                 nitup->t_info &= ~INDEX_SIZE_MASK;
541                 nitup->t_info |= GinGetOrigSizePosting(itup);
542         }
543         else
544         {
545                 nitup = (IndexTuple) palloc(MAXALIGN(IndexTupleSize(itup)));
546                 memcpy(nitup, itup, IndexTupleSize(itup));
547         }
548
549         ItemPointerSet(&nitup->t_tid, BufferGetBlockNumber(buf), InvalidOffsetNumber);
550         return nitup;
551 }
552
553 /*
554  * Fills new root by rightest values from child.
555  * Also called from ginxlog, should not use btree
556  */
557 void
558 entryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
559 {
560         Page            page;
561         IndexTuple      itup;
562
563         page = BufferGetPage(root);
564
565         itup = ginPageGetLinkItup(lbuf);
566         if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
567                 elog(ERROR, "failed to add item to index root page");
568
569         itup = ginPageGetLinkItup(rbuf);
570         if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
571                 elog(ERROR, "failed to add item to index root page");
572 }
573
574 void
575 prepareEntryScan(GinBtree btree, Relation index, Datum value, GinState *ginstate)
576 {
577         memset(btree, 0, sizeof(GinBtreeData));
578
579         btree->isMoveRight = entryIsMoveRight;
580         btree->findChildPage = entryLocateEntry;
581         btree->findItem = entryLocateLeafEntry;
582         btree->findChildPtr = entryFindChildPtr;
583         btree->getLeftMostPage = entryGetLeftMostPage;
584         btree->isEnoughSpace = entryIsEnoughSpace;
585         btree->placeToPage = entryPlaceToPage;
586         btree->splitPage = entrySplitPage;
587         btree->fillRoot = entryFillRoot;
588
589         btree->index = index;
590         btree->ginstate = ginstate;
591         btree->entryValue = value;
592
593         btree->isDelete = FALSE;
594         btree->searchMode = FALSE;
595         btree->fullScan = FALSE;
596         btree->isBuild = FALSE;
597 }