* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.132 2006/04/03 13:44:33 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.133 2006/05/10 09:19:54 teodor Exp $
*
*-------------------------------------------------------------------------
*/
#define ROTATEDIST(d) do { \
SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
memset(tmp,0,sizeof(SplitedPageLayout)); \
+ tmp->block.blkno = InvalidBlockNumber; \
+ tmp->buffer = InvalidBuffer; \
tmp->next = (d); \
(d)=tmp; \
} while(0)
bool is_splitted = false;
bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
+
/*
- * XXX this code really ought to work by locking, but not modifying,
- * all the buffers it needs; then starting a critical section; then
- * modifying the buffers in an already-determined way and writing an
- * XLOG record to reflect that. Since it doesn't, we've got to put
- * a critical section around the entire process, which is horrible
- * from a robustness point of view.
+ * if (!is_leaf) remove old key:
+ * This node's key has been modified, either because a child split
+ * occurred or because we needed to adjust our key for an insert in a
+ * child node. Therefore, remove the old version of this node's key.
+ *
+ * for WAL replay, in the non-split case we handle this by
+ * setting up a one-element todelete array; in the split case, it's
+ * handled implicitly because the tuple vector passed to gistSplit
+ * won't include this tuple.
*/
- START_CRIT_SECTION();
-
- if (!is_leaf)
-
- /*
- * This node's key has been modified, either because a child split
- * occurred or because we needed to adjust our key for an insert in a
- * child node. Therefore, remove the old version of this node's key.
- *
- * Note: for WAL replay, in the non-split case we handle this by
- * setting up a one-element todelete array; in the split case, it's
- * handled implicitly because the tuple vector passed to gistSplit
- * won't include this tuple.
- */
- PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
- if (gistnospace(state->stack->page, state->itup, state->ituplen))
+ if (gistnospace(state->stack->page, state->itup, state->ituplen, (is_leaf) ? InvalidOffsetNumber : state->stack->childoffnum))
{
/* no space for insertion */
- IndexTuple *itvec,
- *newitup;
+ IndexTuple *itvec;
int tlen;
SplitedPageLayout *dist = NULL,
*ptr;
+ BlockNumber rrlink = InvalidBlockNumber;
+ GistNSN oldnsn;
is_splitted = true;
+
+ /*
+ * Form index tuples vector to split:
+ * remove old tuple if t's needed and add new tuples to vector
+ */
itvec = gistextractbuffer(state->stack->buffer, &tlen);
+ if ( !is_leaf ) {
+ /* on inner page we should remove old tuple */
+ int pos = state->stack->childoffnum - FirstOffsetNumber;
+
+ tlen--;
+ if ( pos != tlen )
+ memmove( itvec+pos, itvec + pos + 1, sizeof( IndexTuple ) * (tlen-pos) );
+ }
itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
- newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
+ dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
+
+ state->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * tlen);
+ state->ituplen = 0;
+
+ if (state->stack->blkno != GIST_ROOT_BLKNO) {
+ /* if non-root split then we should not allocate new buffer,
+ but we must create temporary page to operate */
+ dist->buffer = state->stack->buffer;
+ dist->page = PageGetTempPage( BufferGetPage(dist->buffer), sizeof(GISTPageOpaqueData) );
+
+ /*clean all flags except F_LEAF */
+ GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
+ }
+
+ /* make new pages and fills them */
+ for (ptr = dist; ptr; ptr = ptr->next) {
+ int i;
+ char *data;
+
+ /* get new page */
+ if ( ptr->buffer == InvalidBuffer ) {
+ ptr->buffer = gistNewBuffer( state->r );
+ GISTInitBuffer( ptr->buffer, (is_leaf) ? F_LEAF : 0 );
+ ptr->page = BufferGetPage(ptr->buffer);
+ }
+ ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+ /* fill page, we can do it becouse all this pages are new (ie not linked in tree
+ or masked by temp page */
+ data = (char*)(ptr->list);
+ for(i=0;i<ptr->block.num;i++) {
+ if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
+ data += IndexTupleSize((IndexTuple)data);
+ }
+
+ /* set up ItemPointer and remmeber it for parent */
+ ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+ state->itup[ state->ituplen ] = ptr->itup;
+ state->ituplen++;
+ }
+
+ /* saves old rightlink */
+ if ( state->stack->blkno != GIST_ROOT_BLKNO )
+ rrlink = GistPageGetOpaque(dist->page)->rightlink;
+
+ START_CRIT_SECTION();
/*
* must mark buffers dirty before XLogInsert, even though we'll
- * still be changing their opaque fields below
+ * still be changing their opaque fields below.
+ * set up right links.
*/
- for (ptr = dist; ptr; ptr = ptr->next)
+ for (ptr = dist; ptr; ptr = ptr->next)
{
MarkBufferDirty(ptr->buffer);
+ GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
+ ptr->next->block.blkno : rrlink;
+ }
+
+ /* restore splitted non-root page */
+ if ( state->stack->blkno != GIST_ROOT_BLKNO ) {
+ PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) );
+ dist->page = BufferGetPage( dist->buffer );
}
if (!state->r->rd_istemp)
is_leaf, &(state->key), dist);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+
for (ptr = dist; ptr; ptr = ptr->next)
{
- PageSetLSN(BufferGetPage(ptr->buffer), recptr);
- PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+ PageSetLSN(ptr->page, recptr);
+ PageSetTLI(ptr->page, ThisTimeLineID);
}
}
else
{
for (ptr = dist; ptr; ptr = ptr->next)
{
- PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
+ PageSetLSN(ptr->page, XLogRecPtrForTemp);
}
}
- state->itup = newitup;
- state->ituplen = tlen; /* now tlen >= 2 */
-
- if (state->stack->blkno == GIST_ROOT_BLKNO)
- {
- gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
- state->needInsertComplete = false;
- for (ptr = dist; ptr; ptr = ptr->next)
- {
- Page page = (Page) BufferGetPage(ptr->buffer);
+ /* set up NSN */
+ oldnsn = GistPageGetOpaque(dist->page)->nsn;
+ if ( state->stack->blkno == GIST_ROOT_BLKNO )
+ /* if root split we should put initial value */
+ oldnsn = PageGetLSN(dist->page);
- GistPageGetOpaque(page)->rightlink = (ptr->next) ?
- ptr->next->block.blkno : InvalidBlockNumber;
- GistPageGetOpaque(page)->nsn = PageGetLSN(page);
- UnlockReleaseBuffer(ptr->buffer);
- }
+ for (ptr = dist; ptr; ptr = ptr->next) {
+ /* only for last set oldnsn */
+ GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
+ PageGetLSN(ptr->page) : oldnsn;
}
- else
- {
- Page page;
- BlockNumber rightrightlink = InvalidBlockNumber;
- SplitedPageLayout *ourpage = NULL;
- GistNSN oldnsn;
- GISTPageOpaque opaque;
-
- /* move origpage to first in chain */
- if (dist->block.blkno != state->stack->blkno)
- {
- ptr = dist;
- while (ptr->next)
- {
- if (ptr->next->block.blkno == state->stack->blkno)
- {
- ourpage = ptr->next;
- ptr->next = ptr->next->next;
- ourpage->next = dist;
- dist = ourpage;
- break;
- }
- ptr = ptr->next;
- }
- Assert(ourpage != NULL);
- }
- else
- ourpage = dist;
- /* now gets all needed data, and sets nsn's */
- page = (Page) BufferGetPage(ourpage->buffer);
- opaque = GistPageGetOpaque(page);
- rightrightlink = opaque->rightlink;
- oldnsn = opaque->nsn;
- opaque->nsn = PageGetLSN(page);
- opaque->rightlink = ourpage->next->block.blkno;
+ /*
+ * release buffers, if it was a root split then
+ * release all buffers because we create all buffers
+ */
+ ptr = ( state->stack->blkno == GIST_ROOT_BLKNO ) ? dist : dist->next;
+ for(; ptr; ptr = ptr->next)
+ UnlockReleaseBuffer(ptr->buffer);
- /*
- * fill and release all new pages. They isn't linked into tree yet
- */
- for (ptr = ourpage->next; ptr; ptr = ptr->next)
- {
- page = (Page) BufferGetPage(ptr->buffer);
- GistPageGetOpaque(page)->rightlink = (ptr->next) ?
- ptr->next->block.blkno : rightrightlink;
- /* only for last set oldnsn */
- GistPageGetOpaque(page)->nsn = (ptr->next) ?
- opaque->nsn : oldnsn;
-
- UnlockReleaseBuffer(ptr->buffer);
- }
+ if (state->stack->blkno == GIST_ROOT_BLKNO) {
+ gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
+ state->needInsertComplete = false;
}
END_CRIT_SECTION();
else
{
/* enough space */
- XLogRecPtr oldlsn;
+ START_CRIT_SECTION();
+ if (!is_leaf)
+ PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
MarkBufferDirty(state->stack->buffer);
- oldlsn = PageGetLSN(state->stack->page);
if (!state->r->rd_istemp)
{
OffsetNumber noffs = 0,
arr[i] = reasloffset[arr[i]];
}
+static IndexTupleData *
+gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
+ char *ptr, *ret = palloc(BLCKSZ);
+ int i;
+
+ ptr = ret;
+ for (i = 0; i < veclen; i++) {
+ memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
+ ptr += IndexTupleSize(vec[i]);
+ }
+
+ *memlen = ptr - ret;
+ Assert( *memlen < BLCKSZ );
+ return (IndexTupleData*)ret;
+}
+
/*
* gistSplit -- split a page in the tree.
*/
-IndexTuple *
+SplitedPageLayout *
gistSplit(Relation r,
- Buffer buffer,
+ Page page,
IndexTuple *itup, /* contains compressed entry */
- int *len,
- SplitedPageLayout **dist,
+ int len,
GISTSTATE *giststate)
{
- Page p;
- Buffer leftbuf,
- rightbuf;
- Page left,
- right;
IndexTuple *lvectup,
- *rvectup,
- *newtup;
- BlockNumber lbknum,
- rbknum;
- GISTPageOpaque opaque;
+ *rvectup;
GIST_SPLITVEC v;
GistEntryVector *entryvec;
int i,
- fakeoffset,
- nlen;
+ fakeoffset;
OffsetNumber *realoffset;
IndexTuple *cleaneditup = itup;
- int lencleaneditup = *len;
-
- p = (Page) BufferGetPage(buffer);
- opaque = GistPageGetOpaque(p);
-
- /*
- * The root of the tree is the first block in the relation. If we're
- * about to split the root, we need to do some hocus-pocus to enforce this
- * guarantee.
- */
- if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
- {
- leftbuf = gistNewBuffer(r);
- GISTInitBuffer(leftbuf, opaque->flags & F_LEAF);
- lbknum = BufferGetBlockNumber(leftbuf);
- left = (Page) BufferGetPage(leftbuf);
- }
- else
- {
- leftbuf = buffer;
- /* IncrBufferRefCount(buffer); */
- lbknum = BufferGetBlockNumber(buffer);
- left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
- }
-
- rightbuf = gistNewBuffer(r);
- GISTInitBuffer(rightbuf, opaque->flags & F_LEAF);
- rbknum = BufferGetBlockNumber(rightbuf);
- right = (Page) BufferGetPage(rightbuf);
+ int lencleaneditup = len;
+ SplitedPageLayout *res = NULL;
/* generate the item array */
- realoffset = palloc((*len + 1) * sizeof(OffsetNumber));
- entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY));
- entryvec->n = *len + 1;
+ realoffset = palloc((len + 1) * sizeof(OffsetNumber));
+ entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
+ entryvec->n = len + 1;
fakeoffset = FirstOffsetNumber;
- for (i = 1; i <= *len; i++)
+ for (i = 1; i <= len; i++)
{
Datum datum;
bool IsNull;
- if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup[i - 1]))
+ if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
{
entryvec->n--;
/* remember position of invalid tuple */
datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
- datum, r, p, i,
+ datum, r, page, i,
ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
FALSE, IsNull);
realoffset[fakeoffset] = i;
* possible, we move all invalid tuples on right page. We should remember,
* that union with invalid tuples is a invalid tuple.
*/
- if (entryvec->n != *len + 1)
+ if (entryvec->n != len + 1)
{
lencleaneditup = entryvec->n - 1;
cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple));
for (i = 1; i < entryvec->n; i++)
cleaneditup[i - 1] = itup[realoffset[i] - 1];
- if (gistnospace(left, cleaneditup, lencleaneditup))
+ if (!gistfitpage(cleaneditup, lencleaneditup))
{
/* no space on left to put all good tuples, so picksplit */
gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
v.spl_leftvalid = v.spl_rightvalid = false;
v.spl_nright = 0;
v.spl_nleft = 0;
- for (i = 1; i <= *len; i++)
- if (i - 1 < *len / 2)
+ for (i = 1; i <= len; i++)
+ if (i - 1 < len / 2)
v.spl_left[v.spl_nleft++] = i;
else
v.spl_right[v.spl_nright++] = i;
else
{
/* there is no invalid tuples, so usial processing */
- gistUserPicksplit(r, entryvec, &v, itup, *len, giststate);
+ gistUserPicksplit(r, entryvec, &v, itup, len, giststate);
v.spl_leftvalid = v.spl_rightvalid = true;
}
/* form left and right vector */
- lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1));
- rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1));
+ lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
+ rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
for (i = 0; i < v.spl_nleft; i++)
lvectup[i] = itup[v.spl_left[i] - 1];
rvectup[i] = itup[v.spl_right[i] - 1];
/* place invalid tuples on right page if itsn't done yet */
- for (fakeoffset = entryvec->n; fakeoffset < *len + 1 && lencleaneditup; fakeoffset++)
+ for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++)
{
rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
}
- /* write on disk (may need another split) */
- if (gistnospace(right, rvectup, v.spl_nright))
+ /* finalyze splitting (may need another split) */
+ if (!gistfitpage(rvectup, v.spl_nright))
{
- nlen = v.spl_nright;
- newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
- /* ReleaseBuffer(rightbuf); */
+ res = gistSplit(r, page, rvectup, v.spl_nright, giststate);
}
else
{
- char *ptr;
-
- gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
- /* XLOG stuff */
- ROTATEDIST(*dist);
- (*dist)->block.blkno = BufferGetBlockNumber(rightbuf);
- (*dist)->block.num = v.spl_nright;
- (*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
- ptr = (char *) ((*dist)->list);
- for (i = 0; i < v.spl_nright; i++)
- {
- memcpy(ptr, rvectup[i], IndexTupleSize(rvectup[i]));
- ptr += IndexTupleSize(rvectup[i]);
- }
- (*dist)->lenlist = ptr - ((char *) ((*dist)->list));
- (*dist)->buffer = rightbuf;
-
- nlen = 1;
- newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
- newtup[0] = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
- : gist_form_invalid_tuple(rbknum);
- ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum);
+ ROTATEDIST(res);
+ res->block.num = v.spl_nright;
+ res->list = gistfillitupvec(rvectup, v.spl_nright, &( res->lenlist ) );
+ res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
+ : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
}
- if (gistnospace(left, lvectup, v.spl_nleft))
+ if (!gistfitpage(lvectup, v.spl_nleft))
{
- int llen = v.spl_nleft;
- IndexTuple *lntup;
+ SplitedPageLayout *resptr, *subres;
- lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
- /* ReleaseBuffer(leftbuf); */
+ resptr = subres = gistSplit(r, page, lvectup, v.spl_nleft, giststate);
- newtup = gistjoinvector(newtup, &nlen, lntup, llen);
+ /* install on list's tail */
+ while( resptr->next )
+ resptr = resptr->next;
+
+ resptr->next = res;
+ res = subres;
}
else
{
- char *ptr;
-
- gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
- /* XLOG stuff */
- ROTATEDIST(*dist);
- (*dist)->block.blkno = BufferGetBlockNumber(leftbuf);
- (*dist)->block.num = v.spl_nleft;
- (*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
- ptr = (char *) ((*dist)->list);
- for (i = 0; i < v.spl_nleft; i++)
- {
- memcpy(ptr, lvectup[i], IndexTupleSize(lvectup[i]));
- ptr += IndexTupleSize(lvectup[i]);
- }
- (*dist)->lenlist = ptr - ((char *) ((*dist)->list));
- (*dist)->buffer = leftbuf;
-
- if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
- PageRestoreTempPage(left, p);
-
- nlen += 1;
- newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
- newtup[nlen - 1] = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
- : gist_form_invalid_tuple(lbknum);
- ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum);
+ ROTATEDIST(res);
+ res->block.num = v.spl_nleft;
+ res->list = gistfillitupvec(lvectup, v.spl_nleft, &( res->lenlist ) );
+ res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
+ : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
}
- GistClearTuplesDeleted(p);
-
- *len = nlen;
- return newtup;
+ return res;
}
/*
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.19 2006/05/02 22:25:10 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $
*
*-------------------------------------------------------------------------
*/
if (GistPageIsLeaf(page))
{
if (GistTuplesDeleted(page))
- {
needunion = needwrite = true;
- GistClearTuplesDeleted(page);
- }
}
else
{
if (curlenaddon)
{
/* insert updated tuples */
- if (gistnospace(page, addon, curlenaddon))
+ if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber))
{
/* there is no space on page to insert tuples */
IndexTuple *vec;
SplitedPageLayout *dist = NULL,
*ptr;
- int i;
+ int i, veclen=0;
MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
- vec = gistextractbuffer(buffer, &(res.ituplen));
- vec = gistjoinvector(vec, &(res.ituplen), addon, curlenaddon);
- res.itup = gistSplit(gv->index, buffer, vec, &(res.ituplen), &dist, &(gv->giststate));
+ vec = gistextractbuffer(buffer, &veclen);
+ vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
+ dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate));
+
MemoryContextSwitchTo(oldCtx);
- vec = (IndexTuple *) palloc(sizeof(IndexTuple) * res.ituplen);
- for (i = 0; i < res.ituplen; i++)
- {
- vec[i] = (IndexTuple) palloc(IndexTupleSize(res.itup[i]));
- memcpy(vec[i], res.itup[i], IndexTupleSize(res.itup[i]));
+ if (blkno != GIST_ROOT_BLKNO) {
+ /* if non-root split then we should not allocate new buffer */
+ dist->buffer = buffer;
+ dist->page = BufferGetPage(dist->buffer);
+ GistPageGetOpaque(dist->page)->flags = 0;
}
- res.itup = vec;
- for (ptr = dist; ptr; ptr = ptr->next)
- {
+ res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
+ res.ituplen = 0;
+
+ /* make new pages and fills them */
+ for (ptr = dist; ptr; ptr = ptr->next) {
+ char *data;
+
+ if ( ptr->buffer == InvalidBuffer ) {
+ ptr->buffer = gistNewBuffer( gv->index );
+ GISTInitBuffer( ptr->buffer, 0 );
+ ptr->page = BufferGetPage(ptr->buffer);
+ }
+ ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+ data = (char*)(ptr->list);
+ for(i=0;i<ptr->block.num;i++) {
+ if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
+ data += IndexTupleSize((IndexTuple)data);
+ }
+
+ ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+ res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
+ memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
+ res.ituplen++;
+
MarkBufferDirty(ptr->buffer);
}
for (ptr = dist; ptr; ptr = ptr->next)
{
- /* we must keep the buffer lock on the head page */
+ /* we must keep the buffer pin on the head page */
if (BufferGetBlockNumber(ptr->buffer) != blkno)
- LockBuffer(ptr->buffer, GIST_UNLOCK);
- ReleaseBuffer(ptr->buffer);
+ UnlockReleaseBuffer( ptr->buffer );
}
if (blkno == GIST_ROOT_BLKNO)
if (needwrite)
{
MarkBufferDirty(buffer);
+ GistClearTuplesDeleted(page);
if (!gv->index->rd_istemp)
{
/*
* Remove deletable tuples from page
- *
- * XXX try to make this critical section shorter. Could do it
- * by separating the callback loop from the actual tuple deletion,
- * but that would affect the definition of the todelete[] array
- * passed into the WAL record (because the indexes would all be
- * pre-deletion).
*/
- START_CRIT_SECTION();
maxoff = PageGetMaxOffsetNumber(page);
if (callback(&(idxtuple->t_tid), callback_state))
{
- PageIndexTupleDelete(page, i);
- todelete[ntodelete] = i;
- i--;
- maxoff--;
+ todelete[ntodelete] = i-ntodelete;
ntodelete++;
stats->std.tuples_removed += 1;
- Assert(maxoff == PageGetMaxOffsetNumber(page));
}
else
stats->std.num_index_tuples += 1;
if (ntodelete)
{
- GistMarkTuplesDeleted(page);
+ START_CRIT_SECTION();
MarkBufferDirty(buffer);
+ for(i=0;i<ntodelete;i++)
+ PageIndexTupleDelete(page, todelete[i]);
+ GistMarkTuplesDeleted(page);
+
if (!rel->rd_istemp)
{
XLogRecData *rdata;
}
else
PageSetLSN(page, XLogRecPtrForTemp);
+
+ END_CRIT_SECTION();
}
- END_CRIT_SECTION();
}
else
{