* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.129 2006/03/05 15:58:20 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.130 2006/03/30 23:03:09 tgl Exp $
*
*-------------------------------------------------------------------------
*/
double reltuples;
GISTBuildState buildstate;
Buffer buffer;
+ Page page;
/*
* We expect to be called exactly once for any index relation. If that's
/* initialize the root page */
buffer = gistNewBuffer(index);
+ Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
+ page = BufferGetPage(buffer);
+
+ START_CRIT_SECTION();
+
GISTInitBuffer(buffer, F_LEAF);
if (!index->rd_istemp)
{
XLogRecPtr recptr;
XLogRecData rdata;
- Page page;
- rdata.buffer = InvalidBuffer;
rdata.data = (char *) &(index->rd_node);
rdata.len = sizeof(RelFileNode);
+ rdata.buffer = InvalidBuffer;
rdata.next = NULL;
- page = BufferGetPage(buffer);
-
- START_CRIT_SECTION();
-
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
-
- END_CRIT_SECTION();
}
else
- PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp);
+ PageSetLSN(page, XLogRecPtrForTemp);
LockBuffer(buffer, GIST_UNLOCK);
WriteBuffer(buffer);
+ END_CRIT_SECTION();
+
/* build the index */
buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
buildstate.indtuples = 0;
bool is_splitted = false;
bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
+ /*
+ * XXX this code really ought to work by locking, but not modifying,
+ * all the buffers it needs; then starting a critical section; then
+ * modifying the buffers in an already-determined way and writing an
+ * XLOG record to reflect that. Since it doesn't, we've got to put
+ * a critical section around the entire process, which is horrible
+ * from a robustness point of view.
+ */
+ START_CRIT_SECTION();
if (!is_leaf)
* This node's key has been modified, either because a child split
* occurred or because we needed to adjust our key for an insert in a
* child node. Therefore, remove the old version of this node's key.
+ *
+ * Note: for WAL replay, in the non-split case we handle this by
+ * setting up a one-element todelete array; in the split case, it's
+ * handled implicitly because the tuple vector passed to gistSplit
+ * won't include this tuple.
*/
PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
XLogRecData *rdata;
rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
- &(state->key), dist);
-
- START_CRIT_SECTION();
+ is_leaf, &(state->key), dist);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
ptr = dist;
PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
ptr = ptr->next;
}
-
- END_CRIT_SECTION();
}
else
{
else
ourpage = dist;
-
/* now gets all needed data, and sets nsn's */
page = (Page) BufferGetPage(ourpage->buffer);
opaque = GistPageGetOpaque(page);
WriteBuffer(ptr->buffer);
ptr = ptr->next;
}
+
+ WriteNoReleaseBuffer(state->stack->buffer);
}
- WriteNoReleaseBuffer(state->stack->buffer);
+
+ END_CRIT_SECTION();
}
else
{
if (!state->r->rd_istemp)
{
OffsetNumber noffs = 0,
- offs[MAXALIGN(sizeof(OffsetNumber)) / sizeof(OffsetNumber)];
+ offs[1];
XLogRecPtr recptr;
XLogRecData *rdata;
noffs = 1;
}
- rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno,
- offs, noffs, false, state->itup, state->ituplen,
+ rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
+ offs, noffs, false,
+ state->itup, state->ituplen,
&(state->key));
- START_CRIT_SECTION();
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(state->stack->page, recptr);
PageSetTLI(state->stack->page, ThisTimeLineID);
-
- END_CRIT_SECTION();
}
else
PageSetLSN(state->stack->page, XLogRecPtrForTemp);
state->needInsertComplete = false;
WriteNoReleaseBuffer(state->stack->buffer);
+ END_CRIT_SECTION();
+
if (!is_leaf) /* small optimization: inform scan ablout
* deleting... */
gistadjscans(state->r, GISTOP_DEL, state->stack->blkno,
}
/*
- * Should have the same interface as XLogReadBuffer
- */
-static Buffer
-gistReadAndLockBuffer(Relation r, BlockNumber blkno)
-{
- Buffer buffer = ReadBuffer(r, blkno);
-
- LockBuffer(buffer, GIST_SHARE);
- return buffer;
-}
-
-/*
- * Traverse the tree to find path from root page.
+ * Traverse the tree to find path from root page to specified "child" block.
*
* returns from the begining of closest parent;
*
- * Function is used in both regular and recovery mode, so must work with
- * different read functions (gistReadAndLockBuffer and XLogReadBuffer)
- *
* To prevent deadlocks, this should lock only one page simultaneously.
*/
GISTInsertStack *
-gistFindPath(Relation r, BlockNumber child,
- Buffer (*myReadBuffer) (Relation, BlockNumber))
+gistFindPath(Relation r, BlockNumber child)
{
Page page;
Buffer buffer;
while (top && top->blkno != child)
{
- buffer = myReadBuffer(r, top->blkno); /* locks buffer */
+ buffer = ReadBuffer(r, top->blkno);
+ LockBuffer(buffer, GIST_SHARE);
gistcheckpage(r, buffer);
page = (Page) BufferGetPage(buffer);
}
/* ok, find new path */
- ptr = parent = gistFindPath(r, child->blkno, gistReadAndLockBuffer);
+ ptr = parent = gistFindPath(r, child->blkno);
Assert(ptr != NULL);
/* read all buffers as expected by caller */
Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
page = BufferGetPage(buffer);
- GISTInitBuffer(buffer, 0);
+ START_CRIT_SECTION();
+
+ GISTInitBuffer(buffer, 0); /* XXX not F_LEAF? */
gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
+
if (!r->rd_istemp)
{
XLogRecPtr recptr;
XLogRecData *rdata;
- rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO,
- NULL, 0, false, itup, len, key);
-
- START_CRIT_SECTION();
+ rdata = formUpdateRdata(r->rd_node, buffer,
+ NULL, 0, false,
+ itup, len, key);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
-
- END_CRIT_SECTION();
}
else
PageSetLSN(page, XLogRecPtrForTemp);
+
+ WriteNoReleaseBuffer(buffer);
+
+ END_CRIT_SECTION();
}
void
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.16 2006/03/05 15:58:20 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.17 2006/03/30 23:03:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
page = (Page) BufferGetPage(buffer);
maxoff = PageGetMaxOffsetNumber(page);
+ /*
+ * XXX need to reduce scope of changes to page so we can make this
+ * critical section less extensive
+ */
+ START_CRIT_SECTION();
+
if (GistPageIsLeaf(page))
{
if (GistTuplesDeleted(page))
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
rdata = formSplitRdata(gv->index->rd_node, blkno,
- &key, dist);
+ false, &key, dist);
xlinfo = rdata->data;
- START_CRIT_SECTION();
-
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
ptr = dist;
while (ptr)
ptr = ptr->next;
}
- END_CRIT_SECTION();
pfree(xlinfo);
pfree(rdata);
}
oldCtx = MemoryContextSwitchTo(gv->opCtx);
gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
MemoryContextSwitchTo(oldCtx);
-
- WriteNoReleaseBuffer(buffer);
}
needwrite = false;
XLogRecPtr recptr;
char *xlinfo;
- rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete,
- res.emptypage, addon, curlenaddon, NULL);
+ rdata = formUpdateRdata(gv->index->rd_node, buffer,
+ todelete, ntodelete, res.emptypage,
+ addon, curlenaddon, NULL);
xlinfo = rdata->data;
- START_CRIT_SECTION();
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
- END_CRIT_SECTION();
pfree(xlinfo);
pfree(rdata);
else
ReleaseBuffer(buffer);
+ END_CRIT_SECTION();
+
if (ncompleted && !gv->index->rd_istemp)
gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
*/
pushStackIfSplited(page, stack);
+ /*
+ * Remove deletable tuples from page
+ *
+ * XXX try to make this critical section shorter. Could do it
+ * by separating the callback loop from the actual tuple deletion,
+ * but that would affect the definition of the todelete[] array
+ * passed into the WAL record (because the indexes would all be
+ * pre-deletion).
+ */
+ START_CRIT_SECTION();
+
maxoff = PageGetMaxOffsetNumber(page);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
XLogRecData *rdata;
XLogRecPtr recptr;
- gistxlogEntryUpdate *xlinfo;
+ gistxlogPageUpdate *xlinfo;
- rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete,
- false, NULL, 0, NULL);
- xlinfo = (gistxlogEntryUpdate *) rdata->data;
+ rdata = formUpdateRdata(rel->rd_node, buffer,
+ todelete, ntodelete, false,
+ NULL, 0,
+ NULL);
+ xlinfo = (gistxlogPageUpdate *) rdata->data;
- START_CRIT_SECTION();
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
- END_CRIT_SECTION();
pfree(xlinfo);
pfree(rdata);
PageSetLSN(page, XLogRecPtrForTemp);
WriteNoReleaseBuffer(buffer);
}
+
+ END_CRIT_SECTION();
}
else
{
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.12 2006/03/29 21:17:36 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.13 2006/03/30 23:03:10 tgl Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
typedef struct
{
- gistxlogEntryUpdate *data;
+ gistxlogPageUpdate *data;
int len;
IndexTuple *itup;
OffsetNumber *todelete;
-} EntryUpdateRecord;
+} PageUpdateRecord;
typedef struct
{
} gistIncompleteInsert;
-MemoryContext opCtx;
-MemoryContext insertCtx;
+static MemoryContext opCtx; /* working memory for operations */
+static MemoryContext insertCtx; /* holds incomplete_inserts list */
static List *incomplete_inserts;
-#define ItemPointerEQ( a, b ) \
- ( \
- ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
- ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
- )
+#define ItemPointerEQ(a, b) \
+ ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
+ ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
+
static void
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
}
Assert(ninsert->lenblk > 0);
- incomplete_inserts = lappend(incomplete_inserts, ninsert);
+ /*
+ * Stick the new incomplete insert onto the front of the list, not the
+ * back. This is so that gist_xlog_cleanup will process incompletions
+ * in last-in-first-out order.
+ */
+ incomplete_inserts = lcons(ninsert, incomplete_inserts);
+
MemoryContextSwitchTo(oldCxt);
}
if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
{
-
/* found */
- pfree(insert->blkno);
incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
+ pfree(insert->blkno);
pfree(insert);
break;
}
}
static void
-decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record)
+decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
{
char *begin = XLogRecGetData(record),
*ptr;
int i = 0,
addpath = 0;
- decoded->data = (gistxlogEntryUpdate *) begin;
+ decoded->data = (gistxlogPageUpdate *) begin;
if (decoded->data->ntodelete)
{
- decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogEntryUpdate) + addpath);
+ decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
}
else
decoded->todelete = NULL;
decoded->len = 0;
- ptr = begin + sizeof(gistxlogEntryUpdate) + addpath;
+ ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
while (ptr - begin < record->xl_len)
{
decoded->len++;
decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
- ptr = begin + sizeof(gistxlogEntryUpdate) + addpath;
+ ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
while (ptr - begin < record->xl_len)
{
decoded->itup[i] = (IndexTuple) ptr;
* redo any page update (except page split)
*/
static void
-gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
+gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
{
- EntryUpdateRecord xlrec;
+ PageUpdateRecord xlrec;
Relation reln;
Buffer buffer;
Page page;
- decodeEntryUpdateRecord(&xlrec, record);
+ /* nothing to do if whole page was backed up (and no info to do it with) */
+ if (record->xl_info & XLR_BKP_BLOCK_1)
+ return;
+
+ decodePageUpdateRecord(&xlrec, record);
reln = XLogOpenRelation(xlrec.data->node);
buffer = XLogReadBuffer(reln, xlrec.data->blkno, false);
if (!BufferIsValid(buffer))
- elog(PANIC, "block %u unfound", xlrec.data->blkno);
+ return;
page = (Page) BufferGetPage(buffer);
- if (isnewroot)
+ if (XLByteLE(lsn, PageGetLSN(page)))
{
- if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)))
- {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- return;
- }
- }
- else
- {
- if (XLByteLE(lsn, PageGetLSN(page)))
- {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- return;
- }
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ return;
}
if (xlrec.data->isemptypage)
GistClearTuplesDeleted(page);
}
+ GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
Buffer buffer;
Page page;
int i;
- int flags = 0;
+ int flags;
decodePageSplitRecord(&xlrec, record);
reln = XLogOpenRelation(xlrec.data->node);
-
- /* first of all wee need get F_LEAF flag from original page */
- buffer = XLogReadBuffer(reln, xlrec.data->origblkno, false);
- if (!BufferIsValid(buffer))
- elog(PANIC, "block %u unfound", xlrec.data->origblkno);
- page = (Page) BufferGetPage(buffer);
- flags = (GistPageIsLeaf(page)) ? F_LEAF : 0;
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
+ flags = xlrec.data->origleaf ? F_LEAF : 0;
/* loop around all pages */
for (i = 0; i < xlrec.data->npage; i++)
{
NewPage *newpage = xlrec.page + i;
- bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false;
- buffer = XLogReadBuffer(reln, newpage->header->blkno, !isorigpage);
- if (!BufferIsValid(buffer))
- elog(PANIC, "block %u unfound", newpage->header->blkno);
+ buffer = XLogReadBuffer(reln, newpage->header->blkno, true);
+ Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
- if (XLByteLE(lsn, PageGetLSN(page)))
- {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- continue;
- }
-
/* ok, clear buffer */
GISTInitBuffer(buffer, flags);
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{
- case XLOG_GIST_ENTRY_UPDATE:
- case XLOG_GIST_ENTRY_DELETE:
- gistRedoEntryUpdateRecord(lsn, record, false);
+ case XLOG_GIST_PAGE_UPDATE:
+ gistRedoPageUpdateRecord(lsn, record, false);
break;
case XLOG_GIST_NEW_ROOT:
- gistRedoEntryUpdateRecord(lsn, record, true);
+ gistRedoPageUpdateRecord(lsn, record, true);
break;
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record);
}
static void
-out_gistxlogEntryUpdate(StringInfo buf, gistxlogEntryUpdate *xlrec)
+out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
{
out_target(buf, xlrec->node, xlrec->key);
appendStringInfo(buf, "; block number %u", xlrec->blkno);
switch (info)
{
- case XLOG_GIST_ENTRY_UPDATE:
- appendStringInfo(buf, "entry_update: ");
- out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec);
- break;
- case XLOG_GIST_ENTRY_DELETE:
- appendStringInfo(buf, "entry_delete: ");
- out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec);
+ case XLOG_GIST_PAGE_UPDATE:
+ appendStringInfo(buf, "page_update: ");
+ out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
break;
case XLOG_GIST_NEW_ROOT:
appendStringInfo(buf, "new_root: ");
- out_target(buf, ((gistxlogEntryUpdate *) rec)->node, ((gistxlogEntryUpdate *) rec)->key);
+ out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
break;
case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
return tuple;
}
-static Buffer
-gistXLogReadAndLockBuffer(Relation r, BlockNumber blkno)
-{
- Buffer buffer = XLogReadBuffer(r, blkno, false);
-
- if (!BufferIsValid(buffer))
- elog(PANIC, "block %u unfound", blkno);
-
- return buffer;
-}
-
static void
-gixtxlogFindPath(Relation index, gistIncompleteInsert *insert)
+gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
{
GISTInsertStack *top;
insert->pathlen = 0;
insert->path = NULL;
- if ((top = gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL)
+ if ((top = gistFindPath(index, insert->origblkno)) != NULL)
{
int i;
- GISTInsertStack *ptr = top;
+ GISTInsertStack *ptr;
- while (ptr)
- {
+ for (ptr = top; ptr; ptr = ptr->parent)
insert->pathlen++;
- ptr = ptr->parent;
- }
insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
i = 0;
- ptr = top;
- while (ptr)
- {
- insert->path[i] = ptr->blkno;
- i++;
- ptr = ptr->parent;
- }
+ for (ptr = top; ptr; ptr = ptr->parent)
+ insert->path[i++] = ptr->blkno;
}
else
elog(LOG, "lost parent for block %u", insert->origblkno);
}
/*
- * Continue insert after crash. In normal situation, there isn't any incomplete
- * inserts, but if it might be after crash, WAL may has not a record of completetion.
+ * Continue insert after crash. In normal situations, there aren't any
+ * incomplete inserts, but if a crash occurs partway through an insertion
+ * sequence, we'll need to finish making the index valid at the end of WAL
+ * replay.
+ *
+ * Note that we assume the index is now in a valid state, except for the
+ * unfinished insertion. In particular it's safe to invoke gistFindPath();
+ * there shouldn't be any garbage pages for it to run into.
*
* Although stored LSN in gistIncompleteInsert is a LSN of child page,
* we can compare it with LSN of parent, because parent is always locked
* while we change child page (look at gistmakedeal). So if parent's LSN is
- * lesser than stored lsn then changes in parent doesn't do yet.
+ * less than stored lsn then changes in parent aren't done yet.
*/
static void
gistContinueInsert(gistIncompleteInsert *insert)
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
+
+ /*
+ * XXX fall out to avoid making LOG message at bottom of routine.
+ * I think the logic for when to emit that message is all wrong...
+ */
+ return;
}
else
{
int numbuffer;
/* construct path */
- gixtxlogFindPath(index, insert);
+ gistxlogFindPath(index, insert);
Assert(insert->pathlen > 0);
childfound = 0;
numbuffer = 1;
- buffers[numbuffer - 1] = XLogReadBuffer(index, insert->path[i], false);
- if (!BufferIsValid(buffers[numbuffer - 1]))
- elog(PANIC, "block %u unfound", insert->path[i]);
+ buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]);
+ LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE);
pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]);
if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1])))
if (gistnospace(pages[numbuffer - 1], itup, lenitup))
{
- /* no space left on page, so we should split */
- buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true);
- if (!BufferIsValid(buffers[numbuffer]))
- elog(PANIC, "could not obtain new block");
+ /* no space left on page, so we must split */
+ buffers[numbuffer] = ReadBuffer(index, P_NEW);
+ LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
GISTInitBuffer(buffers[numbuffer], 0);
pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber);
* we split root, just copy tuples from old root to new
* page
*/
- parentitup = gistextractbuffer(buffers[numbuffer - 1], &pituplen);
+ parentitup = gistextractbuffer(buffers[numbuffer - 1],
+ &pituplen);
/* sanity check */
if (i + 1 != insert->pathlen)
RelationGetRelationName(index));
/* fill new page */
- buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true);
- if (!BufferIsValid(buffers[numbuffer]))
- elog(PANIC, "could not obtain new block");
+ buffers[numbuffer] = ReadBuffer(index, P_NEW);
+ LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
GISTInitBuffer(buffers[numbuffer], 0);
pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
gist_xlog_cleanup(void)
{
ListCell *l;
- List *reverse = NIL;
- MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
-
- /* we should call gistContinueInsert in reverse order */
+ MemoryContext oldCxt;
+ oldCxt = MemoryContextSwitchTo(opCtx);
foreach(l, incomplete_inserts)
- reverse = lappend(reverse, lfirst(l));
-
- MemoryContextSwitchTo(opCtx);
- foreach(l, reverse)
{
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
XLogRecData *
-formSplitRdata(RelFileNode node, BlockNumber blkno,
+formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
ItemPointer key, SplitedPageLayout *dist)
{
-
XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
SplitedPageLayout *ptr;
xlrec->node = node;
xlrec->origblkno = blkno;
+ xlrec->origleaf = page_is_leaf;
xlrec->npage = (uint16) npage;
if (key)
xlrec->key = *key;
return rdata;
}
-
+/*
+ * Construct the rdata array for an XLOG record describing a page update
+ * (deletion and/or insertion of tuples on a single index page).
+ *
+ * Note that both the todelete array and the tuples are marked as belonging
+ * to the target buffer; they need not be stored in XLOG if XLogInsert decides
+ * to log the whole buffer contents instead. Also, we take care that there's
+ * at least one rdata item referencing the buffer, even when ntodelete and
+ * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
+ */
XLogRecData *
-formUpdateRdata(RelFileNode node, BlockNumber blkno,
+formUpdateRdata(RelFileNode node, Buffer buffer,
OffsetNumber *todelete, int ntodelete, bool emptypage,
IndexTuple *itup, int ituplen, ItemPointer key)
{
XLogRecData *rdata;
- gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate *) palloc(sizeof(gistxlogEntryUpdate));
+ gistxlogPageUpdate *xlrec;
+ int cur,
+ i;
+
+ /* ugly wart in API: emptypage causes us to ignore other inputs */
+ if (emptypage)
+ ntodelete = ituplen = 0;
+
+ rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
+ xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
xlrec->node = node;
- xlrec->blkno = blkno;
+ xlrec->blkno = BufferGetBlockNumber(buffer);
+ xlrec->ntodelete = ntodelete;
+ xlrec->isemptypage = emptypage;
if (key)
xlrec->key = *key;
else
ItemPointerSetInvalid(&(xlrec->key));
- if (emptypage)
- {
- xlrec->isemptypage = true;
- xlrec->ntodelete = 0;
-
- rdata = (XLogRecData *) palloc(sizeof(XLogRecData));
- rdata->buffer = InvalidBuffer;
- rdata->data = (char *) xlrec;
- rdata->len = sizeof(gistxlogEntryUpdate);
- rdata->next = NULL;
- }
- else
- {
- int cur = 1,
- i;
-
- xlrec->isemptypage = false;
- xlrec->ntodelete = ntodelete;
-
- rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
-
- rdata->buffer = InvalidBuffer;
- rdata->data = (char *) xlrec;
- rdata->len = sizeof(gistxlogEntryUpdate);
- rdata->next = NULL;
+ rdata[0].data = (char *) xlrec;
+ rdata[0].len = sizeof(gistxlogPageUpdate);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &(rdata[1]);
- if (ntodelete)
- {
- rdata[cur - 1].next = &(rdata[cur]);
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char *) todelete;
- rdata[cur].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
- rdata[cur].next = NULL;
- cur++;
- }
+ rdata[1].data = (char *) todelete;
+ rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
+ rdata[1].buffer = buffer;
+ rdata[1].buffer_std = true;
+ rdata[1].next = NULL;
- /* new tuples */
- for (i = 0; i < ituplen; i++)
- {
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char *) (itup[i]);
- rdata[cur].len = IndexTupleSize(itup[i]);
- rdata[cur].next = NULL;
- rdata[cur - 1].next = &(rdata[cur]);
- cur++;
- }
+ /* new tuples */
+ cur = 2;
+ for (i = 0; i < ituplen; i++)
+ {
+ rdata[cur - 1].next = &(rdata[cur]);
+ rdata[cur].data = (char *) (itup[i]);
+ rdata[cur].len = IndexTupleSize(itup[i]);
+ rdata[cur].buffer = buffer;
+ rdata[cur].buffer_std = true;
+ rdata[cur].next = NULL;
+ cur++;
}
return rdata;