From: Teodor Sigaev Date: Fri, 19 May 2006 11:10:25 +0000 (+0000) Subject: Rework completion of incomplete inserts. Now it writes X-Git-Tag: REL8_2_BETA1~955 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=5890790b4a510ada1e6e00eb01de759f1dbe9ab3;p=postgresql Rework completion of incomplete inserts. Now it writes WAL log during inserts. --- diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c index 9b32304d1a..a47d81db78 100644 --- a/src/backend/access/gist/gistvacuum.c +++ b/src/backend/access/gist/gistvacuum.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.22 2006/05/19 11:10:25 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -104,19 +104,25 @@ gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) { if (!gv->index->rd_istemp) { - XLogRecData rdata; + XLogRecData rdata[2]; XLogRecPtr recptr; gistxlogPageDelete xlrec; xlrec.node = gv->index->rd_node; xlrec.blkno = blkno; - rdata.buffer = InvalidBuffer; - rdata.data = (char *) &xlrec; - rdata.len = sizeof(gistxlogPageDelete); - rdata.next = NULL; + rdata[0].buffer = buffer; + rdata[0].buffer_std = true; + rdata[0].data = NULL; + rdata[0].len = 0; + rdata[0].next = &(rdata[1]); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata); + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) &xlrec; + rdata[1].len = sizeof(gistxlogPageDelete); + rdata[1].next = NULL; + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 01dab119b2..1126727cd9 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.17 2006/05/17 16:34:59 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.18 2006/05/19 11:10:25 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -73,8 +73,18 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, BlockNumber *blkno, int lenblk, PageSplitRecord *xlinfo /* to extract blkno info */ ) { - MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); - gistIncompleteInsert *ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert)); + MemoryContext oldCxt; + gistIncompleteInsert *ninsert; + + if ( !ItemPointerIsValid(&key) ) + /* + * if key is null then we should not store insertion as incomplete, + * because it's a vacuum operation.. + */ + return; + + oldCxt = MemoryContextSwitchTo(insertCtx); + ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert)); ninsert->node = node; ninsert->key = key; @@ -115,6 +125,12 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key) { ListCell *l; + if ( !ItemPointerIsValid(&key) ) + return; + + if (incomplete_inserts==NIL) + return; + foreach(l, incomplete_inserts) { gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); @@ -180,16 +196,13 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) Page page; /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */ - if (ItemPointerIsValid(&(xldata->key))) - { - if (incomplete_inserts != NIL) - forgetIncompleteInsert(xldata->node, xldata->key); + forgetIncompleteInsert(xldata->node, xldata->key); - if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO) - pushIncompleteInsert(xldata->node, lsn, xldata->key, - &(xldata->blkno), 1, - NULL); - } + if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO) + /* operation with root always finalizes insertion */ + pushIncompleteInsert(xldata->node, lsn, xldata->key, + &(xldata->blkno), 1, + NULL); /* nothing else to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) @@ -252,12 +265,15 @@ gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; + /* nothing else to do if page was backed up (and no info to do it with) */ + if (record->xl_info & XLR_BKP_BLOCK_1) + return; + reln = XLogOpenRelation(xldata->node); buffer = XLogReadBuffer(reln, xldata->blkno, false); if (!BufferIsValid(buffer)) return; - GISTInitBuffer( buffer, 0 ); page = (Page) BufferGetPage(buffer); GistPageSetDeleted(page); @@ -333,15 +349,11 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); } - if (ItemPointerIsValid(&(xlrec.data->key))) - { - if (incomplete_inserts != NIL) - forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); + forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); - pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, - NULL, 0, - &xlrec); - } + pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, + NULL, 0, + &xlrec); } static void @@ -536,7 +548,43 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert) insert->path[i++] = ptr->blkno; } else - elog(LOG, "lost parent for block %u", insert->origblkno); + elog(ERROR, "lost parent for block %u", insert->origblkno); +} + +static SplitedPageLayout* +gistMakePageLayout(Buffer *buffers, int nbuffers) { + SplitedPageLayout *res=NULL, *resptr; + + while( nbuffers-- > 0 ) { + Page page = BufferGetPage( buffers[ nbuffers ] ); + IndexTuple idxtup; + OffsetNumber i; + char *ptr; + + resptr = (SplitedPageLayout*)palloc0( sizeof(SplitedPageLayout) ); + + resptr->block.blkno = BufferGetBlockNumber( buffers[ nbuffers ] ); + resptr->block.num = PageGetMaxOffsetNumber( page ); + + for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) { + idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); + resptr->lenlist += IndexTupleSize(idxtup); + } + + resptr->list = (IndexTupleData*)palloc( resptr->lenlist ); + ptr = (char*)(resptr->list); + + for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) { + idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); + memcpy( ptr, idxtup, IndexTupleSize(idxtup) ); + ptr += IndexTupleSize(idxtup); + } + + resptr->next = res; + res = resptr; + } + + return res; } /* @@ -548,11 +596,11 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert) * Note that we assume the index is now in a valid state, except for the * unfinished insertion. In particular it's safe to invoke gistFindPath(); * there shouldn't be any garbage pages for it to run into. - * - * Although stored LSN in gistIncompleteInsert is a LSN of child page, - * we can compare it with LSN of parent, because parent is always locked - * while we change child page (look at gistmakedeal). So if parent's LSN is - * less than stored lsn then changes in parent aren't done yet. + * + * To complete insert we can't use basic insertion algorithm because + * during insertion we can't call user-defined support functions of opclass. + * So, we insert 'invalid' tuples without real key and do it by separate algorithm. + * 'invalid' tuple should be updated by vacuum full. */ static void gistContinueInsert(gistIncompleteInsert *insert) @@ -574,39 +622,27 @@ gistContinueInsert(gistIncompleteInsert *insert) for (i = 0; i < insert->lenblk; i++) itup[i] = gist_form_invalid_tuple(insert->blkno[i]); + /* + * any insertion of itup[] should make LOG message about + */ + if (insert->origblkno == GIST_ROOT_BLKNO) { /* * it was split root, so we should only make new root. it can't be - * simple insert into root, look at call pushIncompleteInsert in - * gistRedoPageSplitRecord + * simple insert into root, we should replace all content of root. */ Buffer buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true); - Page page; - - Assert(BufferIsValid(buffer)); - page = BufferGetPage(buffer); - GISTInitBuffer(buffer, 0); - gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber); - - PageSetLSN(page, insert->lsn); - PageSetTLI(page, ThisTimeLineID); - - MarkBufferDirty(buffer); + gistnewroot(index, buffer, itup, lenitup, NULL); UnlockReleaseBuffer(buffer); - - /* - * XXX fall out to avoid making LOG message at bottom of routine. - * I think the logic for when to emit that message is all wrong... - */ - return; } else { Buffer *buffers; Page *pages; int numbuffer; + OffsetNumber *todelete; /* construct path */ gistxlogFindPath(index, insert); @@ -615,49 +651,60 @@ gistContinueInsert(gistIncompleteInsert *insert) buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ )); pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ )); + todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ )); for (i = 0; i < insert->pathlen; i++) { int j, k, - pituplen = 0, - childfound = 0; + pituplen = 0; + XLogRecData *rdata; + XLogRecPtr recptr; + Buffer tempbuffer = InvalidBuffer; + int ntodelete = 0; numbuffer = 1; - buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]); - LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE); - pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]); + buffers[0] = ReadBuffer(index, insert->path[i]); + LockBuffer(buffers[0], GIST_EXCLUSIVE); + /* + * we check buffer, because we restored page earlier + */ + gistcheckpage(index, buffers[0]); - if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1]))) - { - UnlockReleaseBuffer(buffers[numbuffer - 1]); - return; - } + pages[0] = BufferGetPage(buffers[0]); + Assert( !GistPageIsLeaf(pages[0]) ); - pituplen = PageGetMaxOffsetNumber(pages[numbuffer - 1]); + pituplen = PageGetMaxOffsetNumber(pages[0]); - /* remove old IndexTuples */ - for (j = 0; j < pituplen && childfound < lenitup; j++) + /* find remove old IndexTuples to remove */ + for (j = 0; j < pituplen && ntodelete < lenitup; j++) { BlockNumber blkno; - ItemId iid = PageGetItemId(pages[numbuffer - 1], j + FirstOffsetNumber); - IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer - 1], iid); + ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber); + IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid); blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid)); for (k = 0; k < lenitup; k++) if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno) { - PageIndexTupleDelete(pages[numbuffer - 1], j + FirstOffsetNumber); - j--; - pituplen--; - childfound++; + todelete[ntodelete] = j + FirstOffsetNumber - ntodelete; + ntodelete++; break; } } - if (gistnospace(pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber)) + if ( ntodelete == 0 ) + elog(PANIC,"gistContinueInsert: can't find pointer to page(s)"); + + /* + * we check space with subtraction only first tuple to delete, hope, + * that wiil be enough space.... + */ + + if (gistnospace(pages[0], itup, lenitup, *todelete)) { + /* no space left on page, so we must split */ buffers[numbuffer] = ReadBuffer(index, P_NEW); LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); @@ -668,62 +715,86 @@ gistContinueInsert(gistIncompleteInsert *insert) if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) { - IndexTuple *parentitup; + Buffer tmp; /* - * we split root, just copy tuples from old root to new - * page + * we split root, just copy content from root to new page */ - parentitup = gistextractpage(pages[numbuffer - 1], - &pituplen); /* sanity check */ if (i + 1 != insert->pathlen) elog(PANIC, "unexpected pathlen in index \"%s\"", RelationGetRelationName(index)); - /* fill new page */ - buffers[numbuffer] = ReadBuffer(index, P_NEW); - LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); - GISTInitBuffer(buffers[numbuffer], 0); - pages[numbuffer] = BufferGetPage(buffers[numbuffer]); - gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber); - numbuffer++; - - /* fill root page */ - GISTInitBuffer(buffers[0], 0); - for (j = 1; j < numbuffer; j++) - { - IndexTuple tuple = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); - - if (PageAddItem(pages[0], - (Item) tuple, - IndexTupleSize(tuple), - (OffsetNumber) j, - LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add item to index page in \"%s\"", - RelationGetRelationName(index)); - } + /* fill new page, root will be changed later */ + tempbuffer = ReadBuffer(index, P_NEW); + LockBuffer(tempbuffer, GIST_EXCLUSIVE); + memcpy( BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer) ); + + /* swap buffers[0] (was root) and temp buffer */ + tmp = buffers[0]; + buffers[0] = tempbuffer; + tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, it is still unchanged */ + + pages[0] = BufferGetPage(buffers[0]); } + + START_CRIT_SECTION(); + + for(j=0;jrd_node, insert->path[i], + false, &(insert->key), + gistMakePageLayout( buffers, numbuffer ) ); + + } else { + START_CRIT_SECTION(); + + for(j=0;jrd_node, buffers[0], + todelete, ntodelete, + itup, lenitup, &(insert->key)); } - else - gistfillbuffer(index, pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber); - lenitup = numbuffer; + /* + * use insert->key as mark for completion of insert (form*Rdata() above) + * for following possible replays + */ + + /* write pages with XLOG LSN */ + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); for (j = 0; j < numbuffer; j++) { - itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); - PageSetLSN(pages[j], insert->lsn); + PageSetLSN(pages[j], recptr); PageSetTLI(pages[j], ThisTimeLineID); GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber; MarkBufferDirty(buffers[j]); + } + + END_CRIT_SECTION(); + + lenitup = numbuffer; + for (j = 0; j < numbuffer; j++) { + itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); UnlockReleaseBuffer(buffers[j]); } + + if ( tempbuffer != InvalidBuffer ) { + /* + * it was a root split, so fill it by new values + */ + gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key)); + UnlockReleaseBuffer(tempbuffer); + } } } ereport(LOG, - (errmsg("index %u/%u/%u needs VACUUM or REINDEX to finish crash recovery", + (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery", insert->node.spcNode, insert->node.dbNode, insert->node.relNode), errdetail("Incomplete insertion detected during crash replay."))); } @@ -747,6 +818,7 @@ gist_xlog_cleanup(void) MemoryContext oldCxt; oldCxt = MemoryContextSwitchTo(opCtx); + foreach(l, incomplete_inserts) { gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);