From 10dd8df68e6c826186b9900b703051b46ccd6b31 Mon Sep 17 00:00:00 2001
From: Teodor Sigaev <teodor@sigaev.ru>
Date: Wed, 10 May 2006 09:19:54 +0000
Subject: [PATCH] Reduce size of critical section and remove call of
 user-defined functions in insertion and deletion, modify gistSplit() to do
 not use buffers.

 TODO: gistvacuumcleanup and XLOG
---
 src/backend/access/gist/gist.c       | 393 ++++++++++++---------------
 src/backend/access/gist/gistutil.c   |  26 +-
 src/backend/access/gist/gistvacuum.c |  83 +++---
 src/backend/access/gist/gistxlog.c   |   4 +-
 src/include/access/gist_private.h    |  15 +-
 5 files changed, 258 insertions(+), 263 deletions(-)

diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 16468fd35a..2272e3339d 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.132 2006/04/03 13:44:33 teodor Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.133 2006/05/10 09:19:54 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -52,6 +52,8 @@ static void gistfindleaf(GISTInsertState *state,
 #define ROTATEDIST(d) do { \
 	SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
 	memset(tmp,0,sizeof(SplitedPageLayout)); \
+	tmp->block.blkno = InvalidBlockNumber;	\
+	tmp->buffer = InvalidBuffer;	\
 	tmp->next = (d); \
 	(d)=tmp; \
 } while(0)
@@ -309,52 +311,111 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
 	bool		is_splitted = false;
 	bool		is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
 
+
 	/*
-	 * XXX this code really ought to work by locking, but not modifying,
-	 * all the buffers it needs; then starting a critical section; then
-	 * modifying the buffers in an already-determined way and writing an
-	 * XLOG record to reflect that.  Since it doesn't, we've got to put
-	 * a critical section around the entire process, which is horrible
-	 * from a robustness point of view.
+	 * if (!is_leaf) remove old key:
+	 * This node's key has been modified, either because a child split
+	 * occurred or because we needed to adjust our key for an insert in a
+	 * child node. Therefore, remove the old version of this node's key.
+	 *
+	 * for WAL replay, in the non-split case we handle this by
+	 * setting up a one-element todelete array; in the split case, it's
+	 * handled implicitly because the tuple vector passed to gistSplit
+	 * won't include this tuple.
 	 */
-	START_CRIT_SECTION();
-
-	if (!is_leaf)
-
-		/*
-		 * This node's key has been modified, either because a child split
-		 * occurred or because we needed to adjust our key for an insert in a
-		 * child node. Therefore, remove the old version of this node's key.
-		 *
-		 * Note: for WAL replay, in the non-split case we handle this by
-		 * setting up a one-element todelete array; in the split case, it's
-		 * handled implicitly because the tuple vector passed to gistSplit
-		 * won't include this tuple.
-		 */
 
-		PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
 
-	if (gistnospace(state->stack->page, state->itup, state->ituplen))
+	if (gistnospace(state->stack->page, state->itup, state->ituplen, (is_leaf) ? InvalidOffsetNumber : state->stack->childoffnum))
 	{
 		/* no space for insertion */
-		IndexTuple *itvec,
-				   *newitup;
+		IndexTuple *itvec;
 		int			tlen;
 		SplitedPageLayout *dist = NULL,
 				   *ptr;
+		BlockNumber	rrlink = InvalidBlockNumber;
+		GistNSN		oldnsn;
 
 		is_splitted = true;
+
+		/*
+		 * Form index tuples vector to split:
+		 * remove old tuple if t's needed and add new tuples to vector
+		 */
 		itvec = gistextractbuffer(state->stack->buffer, &tlen);
+		if ( !is_leaf ) {
+			/* on inner page we should remove old tuple */
+			int pos = state->stack->childoffnum - FirstOffsetNumber;
+
+			tlen--;	
+			if ( pos != tlen ) 
+				memmove( itvec+pos, itvec + pos + 1, sizeof( IndexTuple ) * (tlen-pos) );
+		}
 		itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
-		newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
+		dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
+
+		state->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * tlen);
+		state->ituplen = 0;
+
+		if (state->stack->blkno != GIST_ROOT_BLKNO) {
+			/* if non-root split then we should not allocate new buffer,
+			   but we must create temporary page to operate */ 
+			dist->buffer = state->stack->buffer;
+			dist->page = PageGetTempPage( BufferGetPage(dist->buffer), sizeof(GISTPageOpaqueData) );
+
+			/*clean all flags except F_LEAF */ 
+			GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
+		}
+
+		/* make new pages and fills them */
+		for (ptr = dist; ptr; ptr = ptr->next) {
+			int i;
+			char *data;
+
+			/* get new page */
+			if ( ptr->buffer == InvalidBuffer ) {
+				ptr->buffer = gistNewBuffer( state->r );
+				GISTInitBuffer( ptr->buffer, (is_leaf) ? F_LEAF : 0 );
+				ptr->page = BufferGetPage(ptr->buffer);
+			}
+			ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+			/* fill page, we can do it becouse all this pages are new (ie not linked in tree
+			   or masked by temp page */
+			data = (char*)(ptr->list); 
+			for(i=0;i<ptr->block.num;i++) {
+				if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+					elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
+				data += IndexTupleSize((IndexTuple)data);
+			}
+
+			/* set up ItemPointer and remmeber it for parent */
+			ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+			state->itup[ state->ituplen ] = ptr->itup;
+			state->ituplen++;
+		}
+
+		/* saves old rightlink */
+		if ( state->stack->blkno != GIST_ROOT_BLKNO )
+			rrlink =  GistPageGetOpaque(dist->page)->rightlink;
+
+		START_CRIT_SECTION();
 
 		/*
 		 * must mark buffers dirty before XLogInsert, even though we'll
-		 * still be changing their opaque fields below
+		 * still be changing their opaque fields below.
+		 * set up right links.
 		 */
-		for (ptr = dist; ptr; ptr = ptr->next)
+		for (ptr = dist; ptr; ptr = ptr->next) 
 		{
 			MarkBufferDirty(ptr->buffer);
+			GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
+				ptr->next->block.blkno : rrlink;
+		}
+
+		/* restore splitted non-root page */
+		if ( state->stack->blkno != GIST_ROOT_BLKNO ) {
+			PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) );
+			dist->page = BufferGetPage( dist->buffer );
 		}
 
 		if (!state->r->rd_istemp)
@@ -366,88 +427,44 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
 								   is_leaf, &(state->key), dist);
 
 			recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+
 			for (ptr = dist; ptr; ptr = ptr->next)
 			{
-				PageSetLSN(BufferGetPage(ptr->buffer), recptr);
-				PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+				PageSetLSN(ptr->page, recptr);
+				PageSetTLI(ptr->page, ThisTimeLineID);
 			}
 		}
 		else
 		{
 			for (ptr = dist; ptr; ptr = ptr->next)
 			{
-				PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
+				PageSetLSN(ptr->page, XLogRecPtrForTemp);
 			}
 		}
 
-		state->itup = newitup;
-		state->ituplen = tlen;	/* now tlen >= 2 */
-
-		if (state->stack->blkno == GIST_ROOT_BLKNO)
-		{
-			gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
-			state->needInsertComplete = false;
-			for (ptr = dist; ptr; ptr = ptr->next)
-			{
-				Page		page = (Page) BufferGetPage(ptr->buffer);
+		/* set up NSN */
+		oldnsn = GistPageGetOpaque(dist->page)->nsn;
+		if ( state->stack->blkno == GIST_ROOT_BLKNO )
+			/* if root split we should put initial value */
+			oldnsn = PageGetLSN(dist->page);
 
-				GistPageGetOpaque(page)->rightlink = (ptr->next) ?
-					ptr->next->block.blkno : InvalidBlockNumber;
-				GistPageGetOpaque(page)->nsn = PageGetLSN(page);
-				UnlockReleaseBuffer(ptr->buffer);
-			}
+		for (ptr = dist; ptr; ptr = ptr->next) {
+			/* only for last set oldnsn */
+			GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
+				PageGetLSN(ptr->page) : oldnsn;
 		}
-		else
-		{
-			Page		page;
-			BlockNumber rightrightlink = InvalidBlockNumber;
-			SplitedPageLayout *ourpage = NULL;
-			GistNSN		oldnsn;
-			GISTPageOpaque opaque;
-
-			/* move origpage to first in chain */
-			if (dist->block.blkno != state->stack->blkno)
-			{
-				ptr = dist;
-				while (ptr->next)
-				{
-					if (ptr->next->block.blkno == state->stack->blkno)
-					{
-						ourpage = ptr->next;
-						ptr->next = ptr->next->next;
-						ourpage->next = dist;
-						dist = ourpage;
-						break;
-					}
-					ptr = ptr->next;
-				}
-				Assert(ourpage != NULL);
-			}
-			else
-				ourpage = dist;
 
-			/* now gets all needed data, and sets nsn's */
-			page = (Page) BufferGetPage(ourpage->buffer);
-			opaque = GistPageGetOpaque(page);
-			rightrightlink = opaque->rightlink;
-			oldnsn = opaque->nsn;
-			opaque->nsn = PageGetLSN(page);
-			opaque->rightlink = ourpage->next->block.blkno;
+		/* 
+		 * release buffers, if it was a root split then
+		 * release all buffers because we create all buffers 
+		 */
+		ptr = ( state->stack->blkno == GIST_ROOT_BLKNO ) ? dist : dist->next;
+		for(; ptr; ptr = ptr->next)
+			UnlockReleaseBuffer(ptr->buffer);
 
-			/*
-			 * fill and release all new pages. They isn't linked into tree yet
-			 */
-			for (ptr = ourpage->next; ptr; ptr = ptr->next)
-			{
-				page = (Page) BufferGetPage(ptr->buffer);
-				GistPageGetOpaque(page)->rightlink = (ptr->next) ?
-					ptr->next->block.blkno : rightrightlink;
-				/* only for last set oldnsn */
-				GistPageGetOpaque(page)->nsn = (ptr->next) ?
-					opaque->nsn : oldnsn;
-
-				UnlockReleaseBuffer(ptr->buffer);
-			}
+		if (state->stack->blkno == GIST_ROOT_BLKNO) {
+			gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
+			state->needInsertComplete = false;
 		}
 
 		END_CRIT_SECTION();
@@ -455,13 +472,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
 	else
 	{
 		/* enough space */
-		XLogRecPtr	oldlsn;
+		START_CRIT_SECTION();
 
+		if (!is_leaf)
+			PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
 		gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
 
 		MarkBufferDirty(state->stack->buffer);
 
-		oldlsn = PageGetLSN(state->stack->page);
 		if (!state->r->rd_istemp)
 		{
 			OffsetNumber noffs = 0,
@@ -921,77 +939,55 @@ gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset)
 		arr[i] = reasloffset[arr[i]];
 }
 
+static IndexTupleData *
+gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
+	char *ptr, *ret = palloc(BLCKSZ);
+	int i;
+
+	ptr = ret;
+	for (i = 0; i < veclen; i++) {
+		memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
+		ptr += IndexTupleSize(vec[i]);
+	}
+
+	*memlen = ptr - ret;
+	Assert( *memlen < BLCKSZ );
+	return (IndexTupleData*)ret;
+}
+
 /*
  *	gistSplit -- split a page in the tree.
  */
-IndexTuple *
+SplitedPageLayout *
 gistSplit(Relation r,
-		  Buffer buffer,
+		  Page page,
 		  IndexTuple *itup,		/* contains compressed entry */
-		  int *len,
-		  SplitedPageLayout **dist,
+		  int len,
 		  GISTSTATE *giststate)
 {
-	Page		p;
-	Buffer		leftbuf,
-				rightbuf;
-	Page		left,
-				right;
 	IndexTuple *lvectup,
-			   *rvectup,
-			   *newtup;
-	BlockNumber lbknum,
-				rbknum;
-	GISTPageOpaque opaque;
+			   *rvectup;
 	GIST_SPLITVEC v;
 	GistEntryVector *entryvec;
 	int			i,
-				fakeoffset,
-				nlen;
+				fakeoffset;
 	OffsetNumber *realoffset;
 	IndexTuple *cleaneditup = itup;
-	int			lencleaneditup = *len;
-
-	p = (Page) BufferGetPage(buffer);
-	opaque = GistPageGetOpaque(p);
-
-	/*
-	 * The root of the tree is the first block in the relation.  If we're
-	 * about to split the root, we need to do some hocus-pocus to enforce this
-	 * guarantee.
-	 */
-	if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
-	{
-		leftbuf = gistNewBuffer(r);
-		GISTInitBuffer(leftbuf, opaque->flags & F_LEAF);
-		lbknum = BufferGetBlockNumber(leftbuf);
-		left = (Page) BufferGetPage(leftbuf);
-	}
-	else
-	{
-		leftbuf = buffer;
-		/* IncrBufferRefCount(buffer); */
-		lbknum = BufferGetBlockNumber(buffer);
-		left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
-	}
-
-	rightbuf = gistNewBuffer(r);
-	GISTInitBuffer(rightbuf, opaque->flags & F_LEAF);
-	rbknum = BufferGetBlockNumber(rightbuf);
-	right = (Page) BufferGetPage(rightbuf);
+	int			lencleaneditup = len;
+	SplitedPageLayout	*res = NULL;
 
 	/* generate the item array */
-	realoffset = palloc((*len + 1) * sizeof(OffsetNumber));
-	entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY));
-	entryvec->n = *len + 1;
+	realoffset = palloc((len + 1) * sizeof(OffsetNumber));
+	entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
+	entryvec->n = len + 1;
 
 	fakeoffset = FirstOffsetNumber;
-	for (i = 1; i <= *len; i++)
+	for (i = 1; i <= len; i++)
 	{
 		Datum		datum;
 		bool		IsNull;
 
-		if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup[i - 1]))
+		if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
 		{
 			entryvec->n--;
 			/* remember position of invalid tuple */
@@ -1001,7 +997,7 @@ gistSplit(Relation r,
 
 		datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
 		gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
-					   datum, r, p, i,
+					   datum, r, page, i,
 					   ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
 					   FALSE, IsNull);
 		realoffset[fakeoffset] = i;
@@ -1013,14 +1009,14 @@ gistSplit(Relation r,
 	 * possible, we move all invalid tuples on right page. We should remember,
 	 * that union with invalid tuples is a invalid tuple.
 	 */
-	if (entryvec->n != *len + 1)
+	if (entryvec->n != len + 1)
 	{
 		lencleaneditup = entryvec->n - 1;
 		cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple));
 		for (i = 1; i < entryvec->n; i++)
 			cleaneditup[i - 1] = itup[realoffset[i] - 1];
 
-		if (gistnospace(left, cleaneditup, lencleaneditup))
+		if (!gistfitpage(cleaneditup, lencleaneditup))
 		{
 			/* no space on left to put all good tuples, so picksplit */
 			gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
@@ -1041,8 +1037,8 @@ gistSplit(Relation r,
 				v.spl_leftvalid = v.spl_rightvalid = false;
 				v.spl_nright = 0;
 				v.spl_nleft = 0;
-				for (i = 1; i <= *len; i++)
-					if (i - 1 < *len / 2)
+				for (i = 1; i <= len; i++)
+					if (i - 1 < len / 2)
 						v.spl_left[v.spl_nleft++] = i;
 					else
 						v.spl_right[v.spl_nright++] = i;
@@ -1071,14 +1067,14 @@ gistSplit(Relation r,
 	else
 	{
 		/* there is no invalid tuples, so usial processing */
-		gistUserPicksplit(r, entryvec, &v, itup, *len, giststate);
+		gistUserPicksplit(r, entryvec, &v, itup, len, giststate);
 		v.spl_leftvalid = v.spl_rightvalid = true;
 	}
 
 
 	/* form left and right vector */
-	lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1));
-	rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1));
+	lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
+	rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
 
 	for (i = 0; i < v.spl_nleft; i++)
 		lvectup[i] = itup[v.spl_left[i] - 1];
@@ -1087,87 +1083,48 @@ gistSplit(Relation r,
 		rvectup[i] = itup[v.spl_right[i] - 1];
 
 	/* place invalid tuples on right page if itsn't done yet */
-	for (fakeoffset = entryvec->n; fakeoffset < *len + 1 && lencleaneditup; fakeoffset++)
+	for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++)
 	{
 		rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
 	}
 
-	/* write on disk (may need another split) */
-	if (gistnospace(right, rvectup, v.spl_nright))
+	/* finalyze splitting (may need another split) */
+	if (!gistfitpage(rvectup, v.spl_nright))
 	{
-		nlen = v.spl_nright;
-		newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
-		/* ReleaseBuffer(rightbuf); */
+		res = gistSplit(r, page, rvectup, v.spl_nright, giststate);
 	}
 	else
 	{
-		char	   *ptr;
-
-		gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
-		/* XLOG stuff */
-		ROTATEDIST(*dist);
-		(*dist)->block.blkno = BufferGetBlockNumber(rightbuf);
-		(*dist)->block.num = v.spl_nright;
-		(*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
-		ptr = (char *) ((*dist)->list);
-		for (i = 0; i < v.spl_nright; i++)
-		{
-			memcpy(ptr, rvectup[i], IndexTupleSize(rvectup[i]));
-			ptr += IndexTupleSize(rvectup[i]);
-		}
-		(*dist)->lenlist = ptr - ((char *) ((*dist)->list));
-		(*dist)->buffer = rightbuf;
-
-		nlen = 1;
-		newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
-		newtup[0] = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
-			: gist_form_invalid_tuple(rbknum);
-		ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum);
+		ROTATEDIST(res);
+		res->block.num = v.spl_nright;
+		res->list = gistfillitupvec(rvectup, v.spl_nright, &( res->lenlist ) );
+		res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
+			: gist_form_invalid_tuple(GIST_ROOT_BLKNO);
 	}
 
-	if (gistnospace(left, lvectup, v.spl_nleft))
+	if (!gistfitpage(lvectup, v.spl_nleft))
 	{
-		int			llen = v.spl_nleft;
-		IndexTuple *lntup;
+		SplitedPageLayout *resptr, *subres;
 
-		lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
-		/* ReleaseBuffer(leftbuf); */
+		resptr = subres = gistSplit(r, page, lvectup, v.spl_nleft, giststate);
 
-		newtup = gistjoinvector(newtup, &nlen, lntup, llen);
+		/* install on list's tail */ 
+		while( resptr->next )
+			resptr = resptr->next;
+
+		resptr->next = res;
+		res = subres;
 	}
 	else
 	{
-		char	   *ptr;
-
-		gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
-		/* XLOG stuff */
-		ROTATEDIST(*dist);
-		(*dist)->block.blkno = BufferGetBlockNumber(leftbuf);
-		(*dist)->block.num = v.spl_nleft;
-		(*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
-		ptr = (char *) ((*dist)->list);
-		for (i = 0; i < v.spl_nleft; i++)
-		{
-			memcpy(ptr, lvectup[i], IndexTupleSize(lvectup[i]));
-			ptr += IndexTupleSize(lvectup[i]);
-		}
-		(*dist)->lenlist = ptr - ((char *) ((*dist)->list));
-		(*dist)->buffer = leftbuf;
-
-		if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
-			PageRestoreTempPage(left, p);
-
-		nlen += 1;
-		newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
-		newtup[nlen - 1] = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
-			: gist_form_invalid_tuple(lbknum);
-		ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum);
+		ROTATEDIST(res);
+		res->block.num = v.spl_nleft;
+		res->list = gistfillitupvec(lvectup, v.spl_nleft, &( res->lenlist ) );
+		res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
+			: gist_form_invalid_tuple(GIST_ROOT_BLKNO);
 	}
 
-	GistClearTuplesDeleted(p);
-
-	*len = nlen;
-	return newtup;
+	return res;
 }
 
 /*
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index bf0a090ff8..d5d6405100 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *			$PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.10 2006/03/05 15:58:20 momjian Exp $
+ *			$PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.11 2006/05/10 09:19:54 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -81,15 +81,31 @@ gistfillbuffer(Relation r, Page page, IndexTuple *itup,
  * Check space for itup vector on page
  */
 bool
-gistnospace(Page page, IndexTuple *itvec, int len)
+gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete)
 {
-	unsigned int size = 0;
+	unsigned int size = 0, deleted = 0;
 	int			i;
 
 	for (i = 0; i < len; i++)
 		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
 
-	return (PageGetFreeSpace(page) < size);
+	if ( todelete != InvalidOffsetNumber ) {
+		IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
+		deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
+	}
+
+	return (PageGetFreeSpace(page) + deleted < size);
+}
+
+bool
+gistfitpage(IndexTuple *itvec, int len) {
+	int i;
+	Size size=0;
+
+	for(i=0;i<len;i++)
+		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
+
+	return (size <= GiSTPageSize);
 }
 
 /*
@@ -107,7 +123,7 @@ gistextractbuffer(Buffer buffer, int *len /* out */ )
 	*len = maxoff;
 	itvec = palloc(sizeof(IndexTuple) * maxoff);
 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-		itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+		itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
 
 	return itvec;
 }
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index eafd472c5f..e81c0ebf48 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.19 2006/05/02 22:25:10 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -85,10 +85,7 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 	if (GistPageIsLeaf(page))
 	{
 		if (GistTuplesDeleted(page))
-		{
 			needunion = needwrite = true;
-			GistClearTuplesDeleted(page);
-		}
 	}
 	else
 	{
@@ -157,30 +154,54 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 		if (curlenaddon)
 		{
 			/* insert updated tuples */
-			if (gistnospace(page, addon, curlenaddon))
+			if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber))
 			{
 				/* there is no space on page to insert tuples */
 				IndexTuple *vec;
 				SplitedPageLayout *dist = NULL,
 						   *ptr;
-				int			i;
+				int			i, veclen=0;
 				MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
 
-				vec = gistextractbuffer(buffer, &(res.ituplen));
-				vec = gistjoinvector(vec, &(res.ituplen), addon, curlenaddon);
-				res.itup = gistSplit(gv->index, buffer, vec, &(res.ituplen), &dist, &(gv->giststate));
+				vec = gistextractbuffer(buffer, &veclen);
+				vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
+				dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate));
+
 				MemoryContextSwitchTo(oldCtx);
 
-				vec = (IndexTuple *) palloc(sizeof(IndexTuple) * res.ituplen);
-				for (i = 0; i < res.ituplen; i++)
-				{
-					vec[i] = (IndexTuple) palloc(IndexTupleSize(res.itup[i]));
-					memcpy(vec[i], res.itup[i], IndexTupleSize(res.itup[i]));
+				if (blkno != GIST_ROOT_BLKNO) {
+					/* if non-root split then we should not allocate new buffer */
+					dist->buffer = buffer;
+					dist->page = BufferGetPage(dist->buffer);
+					GistPageGetOpaque(dist->page)->flags = 0;
 				}
-				res.itup = vec;
 
-				for (ptr = dist; ptr; ptr = ptr->next)
-				{
+				res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
+				res.ituplen = 0;
+
+				/* make new pages and fills them */
+				for (ptr = dist; ptr; ptr = ptr->next) {
+					char *data;
+
+					if ( ptr->buffer == InvalidBuffer ) {
+						ptr->buffer = gistNewBuffer( gv->index );
+						GISTInitBuffer( ptr->buffer, 0 );
+						ptr->page = BufferGetPage(ptr->buffer);
+					}
+					ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+					data = (char*)(ptr->list);
+					for(i=0;i<ptr->block.num;i++) {
+						if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+							elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
+						data += IndexTupleSize((IndexTuple)data);
+					}
+
+					ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+					res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
+					memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
+					res.ituplen++;
+
 					MarkBufferDirty(ptr->buffer);
 				}
 
@@ -218,10 +239,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 
 				for (ptr = dist; ptr; ptr = ptr->next)
 				{
-					/* we must keep the buffer lock on the head page */
+					/* we must keep the buffer pin on the head page */
 					if (BufferGetBlockNumber(ptr->buffer) != blkno)
-						LockBuffer(ptr->buffer, GIST_UNLOCK);
-					ReleaseBuffer(ptr->buffer);
+						UnlockReleaseBuffer( ptr->buffer );
 				}
 
 				if (blkno == GIST_ROOT_BLKNO)
@@ -294,6 +314,7 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 	if (needwrite)
 	{
 		MarkBufferDirty(buffer);
+		GistClearTuplesDeleted(page);
 
 		if (!gv->index->rd_istemp)
 		{
@@ -570,14 +591,7 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 
 			/*
 			 * Remove deletable tuples from page
-			 *
-			 * XXX try to make this critical section shorter.  Could do it
-			 * by separating the callback loop from the actual tuple deletion,
-			 * but that would affect the definition of the todelete[] array
-			 * passed into the WAL record (because the indexes would all be
-			 * pre-deletion).
 			 */
-			START_CRIT_SECTION();
 
 			maxoff = PageGetMaxOffsetNumber(page);
 
@@ -588,13 +602,9 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 
 				if (callback(&(idxtuple->t_tid), callback_state))
 				{
-					PageIndexTupleDelete(page, i);
-					todelete[ntodelete] = i;
-					i--;
-					maxoff--;
+					todelete[ntodelete] = i-ntodelete;
 					ntodelete++;
 					stats->std.tuples_removed += 1;
-					Assert(maxoff == PageGetMaxOffsetNumber(page));
 				}
 				else
 					stats->std.num_index_tuples += 1;
@@ -602,10 +612,14 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 
 			if (ntodelete)
 			{
-				GistMarkTuplesDeleted(page);
+				START_CRIT_SECTION();
 
 				MarkBufferDirty(buffer);
 
+				for(i=0;i<ntodelete;i++)
+					PageIndexTupleDelete(page, todelete[i]);
+				GistMarkTuplesDeleted(page);
+
 				if (!rel->rd_istemp)
 				{
 					XLogRecData *rdata;
@@ -627,9 +641,10 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 				}
 				else
 					PageSetLSN(page, XLogRecPtrForTemp);
+
+				END_CRIT_SECTION();
 			}
 
-			END_CRIT_SECTION();
 		}
 		else
 		{
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index c74762b7df..a029d8f1ec 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *			 $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.15 2006/04/03 16:45:50 tgl Exp $
+ *			 $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.16 2006/05/10 09:19:54 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -625,7 +625,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
 					}
 			}
 
-			if (gistnospace(pages[numbuffer - 1], itup, lenitup))
+			if (gistnospace(pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber))
 			{
 				/* no space left on page, so we must split */
 				buffers[numbuffer] = ReadBuffer(index, P_NEW);
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 1bfc90abbc..7e9469f000 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.12 2006/03/30 23:03:10 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.13 2006/05/10 09:19:54 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -138,6 +138,8 @@ typedef struct SplitedPageLayout
 	gistxlogPage block;
 	IndexTupleData *list;
 	int			lenlist;
+	IndexTuple	itup;  /* union key for page */
+	Page		page;			/* to operate */
 	Buffer		buffer;			/* to write after all proceed */
 
 	struct SplitedPageLayout *next;
@@ -234,8 +236,8 @@ extern void freeGISTstate(GISTSTATE *giststate);
 extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
 extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
 
-extern IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
-		  int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
+extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
+		  int len, GISTSTATE *giststate);
 
 extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child);
 
@@ -261,11 +263,16 @@ extern Datum gistgettuple(PG_FUNCTION_ARGS);
 extern Datum gistgetmulti(PG_FUNCTION_ARGS);
 
 /* gistutil.c */
+
+#define GiSTPageSize   \
+    ( BLCKSZ - SizeOfPageHeaderData - MAXALIGN(sizeof(GISTPageOpaqueData)) ) 
+
+extern bool gistfitpage(IndexTuple *itvec, int len);
+extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete);
 extern void gistcheckpage(Relation rel, Buffer buf);
 extern Buffer gistNewBuffer(Relation r);
 extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
 			   int len, OffsetNumber off);
-extern bool gistnospace(Page page, IndexTuple *itvec, int len);
 extern IndexTuple *gistextractbuffer(Buffer buffer, int *len /* out */ );
 extern IndexTuple *gistjoinvector(
 			   IndexTuple *itvec, int *len,
-- 
2.40.0