state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buffer, 0);
+ /*
+ * We might have found a page that was recently deleted by VACUUM. If
+ * so, we can reuse it, but we must reinitialize it.
+ */
+ if (PageIsNew(page) || BloomPageIsDeleted(page))
+ BloomInitPage(page, 0);
+
if (BloomPageAddItem(&blstate, page, itup))
{
/* Success! Apply the change, clean up, and exit */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
page = GenericXLogRegisterBuffer(state, buffer, 0);
+ /* Basically same logic as above */
+ if (PageIsNew(page) || BloomPageIsDeleted(page))
+ BloomInitPage(page, 0);
+
if (BloomPageAddItem(&blstate, page, itup))
{
/* Success! Apply the changes, clean up, and exit */
/*
* Add new bloom tuple to the page. Returns true if new tuple was successfully
- * added to the page. Returns false if it doesn't fit the page.
+ * added to the page. Returns false if it doesn't fit on the page.
*/
bool
BloomPageAddItem(BloomState *state, Page page, BloomTuple *tuple)
BloomPageOpaque opaque;
Pointer ptr;
- /* Does new tuple fit the page */
+ /* We shouldn't be pointed to an invalid page */
+ Assert(!PageIsNew(page) && !BloomPageIsDeleted(page));
+
+ /* Does new tuple fit on the page? */
if (BloomPageGetFreeSpace(state, page) < state->sizeOfBloomTuple)
return false;
ptr = (Pointer) BloomPageGetTuple(state, page, opaque->maxoff + 1);
((PageHeader) page)->pd_lower = ptr - page;
+ /* Assert we didn't overrun available space */
+ Assert(((PageHeader) page)->pd_lower <= ((PageHeader) page)->pd_upper);
+
return true;
}
metadata->magickNumber = BLOOM_MAGICK_NUMBER;
metadata->opts = *opts;
((PageHeader) metaPage)->pd_lower += sizeof(BloomMetaPageData);
+
+ /* If this fails, probably FreeBlockNumberArray size calc is wrong: */
+ Assert(((PageHeader) metaPage)->pd_lower <= ((PageHeader) metaPage)->pd_upper);
}
/*
#include "postgres.h"
#include "access/genam.h"
+#include "bloom.h"
#include "catalog/storage.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
-#include "bloom.h"
/*
* Bulk deletion of all index entries pointing to a set of heap tuples.
BloomState state;
Buffer buffer;
Page page;
+ BloomMetaPageData *metaData;
GenericXLogState *gxlogState;
if (stats == NULL)
*itupPtr,
*itupEnd;
+ vacuum_delay_point();
+
buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
RBM_NORMAL, info->strategy);
gxlogState = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(gxlogState, buffer, 0);
- if (BloomPageIsDeleted(page))
+ /* Ignore empty/deleted pages until blvacuumcleanup() */
+ if (PageIsNew(page) || BloomPageIsDeleted(page))
{
UnlockReleaseBuffer(buffer);
GenericXLogAbort(gxlogState);
- CHECK_FOR_INTERRUPTS();
continue;
}
- /* Iterate over the tuples */
+ /*
+ * Iterate over the tuples. itup points to current tuple being
+ * scanned, itupPtr points to where to save next non-deleted tuple.
+ */
itup = itupPtr = BloomPageGetTuple(&state, page, FirstOffsetNumber);
itupEnd = BloomPageGetTuple(&state, page,
OffsetNumberNext(BloomPageGetMaxOffset(page)));
/* Do we have to delete this tuple? */
if (callback(&itup->heapPtr, callback_state))
{
- stats->tuples_removed += 1;
+ /* Yes; adjust count of tuples that will be left on page */
BloomPageGetOpaque(page)->maxoff--;
+ stats->tuples_removed += 1;
}
else
{
+ /* No; copy it to itupPtr++, but skip copy if not needed */
if (itupPtr != itup)
- {
- /*
- * If we already delete something before, we have to move
- * this tuple backward.
- */
memmove((Pointer) itupPtr, (Pointer) itup,
state.sizeOfBloomTuple);
- }
- stats->num_index_tuples++;
itupPtr = BloomPageGetNextTuple(&state, itupPtr);
}
itup = BloomPageGetNextTuple(&state, itup);
}
+ /* Assert that we counted correctly */
Assert(itupPtr == BloomPageGetTuple(&state, page,
OffsetNumberNext(BloomPageGetMaxOffset(page))));
/*
- * Add page to notFullPage list if we will not mark page as deleted
- * and there is a free space on it
+ * Add page to new notFullPage list if we will not mark page as
+ * deleted and there is free space on it
*/
if (BloomPageGetMaxOffset(page) != 0 &&
- BloomPageGetFreeSpace(&state, page) > state.sizeOfBloomTuple &&
+ BloomPageGetFreeSpace(&state, page) >= state.sizeOfBloomTuple &&
countPage < BloomMetaBlockN)
notFullPage[countPage++] = blkno;
GenericXLogAbort(gxlogState);
}
UnlockReleaseBuffer(buffer);
- CHECK_FOR_INTERRUPTS();
}
- if (countPage > 0)
- {
- BloomMetaPageData *metaData;
-
- buffer = ReadBuffer(index, BLOOM_METAPAGE_BLKNO);
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ /*
+ * Update the metapage's notFullPage list with whatever we found. Our
+ * info could already be out of date at this point, but blinsert() will
+ * cope if so.
+ */
+ buffer = ReadBuffer(index, BLOOM_METAPAGE_BLKNO);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- gxlogState = GenericXLogStart(index);
- page = GenericXLogRegisterBuffer(gxlogState, buffer, 0);
+ gxlogState = GenericXLogStart(index);
+ page = GenericXLogRegisterBuffer(gxlogState, buffer, 0);
- metaData = BloomPageGetMeta(page);
- memcpy(metaData->notFullPage, notFullPage, sizeof(BlockNumber) * countPage);
- metaData->nStart = 0;
- metaData->nEnd = countPage;
+ metaData = BloomPageGetMeta(page);
+ memcpy(metaData->notFullPage, notFullPage, sizeof(BlockNumber) * countPage);
+ metaData->nStart = 0;
+ metaData->nEnd = countPage;
- GenericXLogFinish(gxlogState);
- UnlockReleaseBuffer(buffer);
- }
+ GenericXLogFinish(gxlogState);
+ UnlockReleaseBuffer(buffer);
return stats;
}
Relation index = info->index;
BlockNumber npages,
blkno;
- BlockNumber totFreePages;
if (info->analyze_only)
return stats;
* statistics.
*/
npages = RelationGetNumberOfBlocks(index);
- totFreePages = 0;
+ stats->num_pages = npages;
+ stats->pages_free = 0;
+ stats->num_index_tuples = 0;
for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
{
Buffer buffer;
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = (Page) BufferGetPage(buffer);
- if (BloomPageIsDeleted(page))
+ if (PageIsNew(page) || BloomPageIsDeleted(page))
{
RecordFreeIndexPage(index, blkno);
- totFreePages++;
+ stats->pages_free++;
}
else
{
stats->num_index_tuples += BloomPageGetMaxOffset(page);
- stats->estimated_count += BloomPageGetMaxOffset(page);
}
UnlockReleaseBuffer(buffer);
}
IndexFreeSpaceMapVacuum(info->index);
- stats->pages_free = totFreePages;
- stats->num_pages = RelationGetNumberOfBlocks(index);
return stats;
}