#include "access/htup_details.h"
#include "nodes/bitmapset.h"
#include "nodes/tidbitmap.h"
+#include "storage/lwlock.h"
+#include "utils/dsa.h"
/*
* The maximum number of tuples per page is not large (typically 256 with
bitmapword words[Max(WORDS_PER_PAGE, WORDS_PER_CHUNK)];
} PagetableEntry;
+/*
+ * Holds array of pagetable entries.
+ */
+typedef struct PTEntryArray
+{
+ pg_atomic_uint32 refcount; /* no. of iterator attached */
+ PagetableEntry ptentry[FLEXIBLE_ARRAY_MEMBER];
+} PTEntryArray;
+
/*
* We want to avoid the overhead of creating the hashtable, which is
* comparatively large, when not necessary. Particularly when we are using a
TBM_HASH /* pagetable is valid, entry1 is not */
} TBMStatus;
+/*
+ * Current iterating state of the TBM.
+ */
+typedef enum
+{
+ TBM_NOT_ITERATING, /* not yet converted to page and chunk array */
+ TBM_ITERATING_PRIVATE, /* converted to local page and chunk array */
+ TBM_ITERATING_SHARED /* converted to shared page and chunk array */
+} TBMIteratingState;
+
/*
* Here is the representation for a whole TIDBitMap:
*/
int maxentries; /* limit on same to meet maxbytes */
int npages; /* number of exact entries in pagetable */
int nchunks; /* number of lossy entries in pagetable */
- bool iterating; /* tbm_begin_iterate called? */
+ TBMIteratingState iterating; /* tbm_begin_iterate called? */
uint32 lossify_start; /* offset to start lossifying hashtable at */
PagetableEntry entry1; /* used when status == TBM_ONE_PAGE */
/* these are valid when iterating is true: */
PagetableEntry **spages; /* sorted exact-page list, or NULL */
PagetableEntry **schunks; /* sorted lossy-chunk list, or NULL */
+ dsa_pointer dsapagetable; /* dsa_pointer to the element array */
+ dsa_pointer dsapagetableold; /* dsa_pointer to the old element array */
+ dsa_pointer ptpages; /* dsa_pointer to the page array */
+ dsa_pointer ptchunks; /* dsa_pointer to the chunk array */
+ dsa_area *dsa; /* reference to per-query dsa area */
};
/*
TBMIterateResult output; /* MUST BE LAST (because variable-size) */
};
+/*
+ * Holds the shared members of the iterator so that multiple processes
+ * can jointly iterate.
+ */
+typedef struct TBMSharedIteratorState
+{
+ int nentries; /* number of entries in pagetable */
+ int maxentries; /* limit on same to meet maxbytes */
+ int npages; /* number of exact entries in pagetable */
+ int nchunks; /* number of lossy entries in pagetable */
+ dsa_pointer pagetable; /* dsa pointers to head of pagetable data */
+ dsa_pointer spages; /* dsa pointer to page array */
+ dsa_pointer schunks; /* dsa pointer to chunk array */
+ LWLock lock; /* lock to protect below members */
+ int spageptr; /* next spages index */
+ int schunkptr; /* next schunks index */
+ int schunkbit; /* next bit to check in current schunk */
+} TBMSharedIteratorState;
+
+/*
+ * pagetable iteration array.
+ */
+typedef struct PTIterationArray
+{
+ pg_atomic_uint32 refcount; /* no. of iterator attached */
+ int index[FLEXIBLE_ARRAY_MEMBER]; /* index array */
+} PTIterationArray;
+
+/*
+ * same as TBMIterator, but it is used for joint iteration, therefore this
+ * also holds a reference to the shared state.
+ */
+struct TBMSharedIterator
+{
+ TBMSharedIteratorState *state; /* shared state */
+ PTEntryArray *ptbase; /* pagetable element array */
+ PTIterationArray *ptpages; /* sorted exact page index list */
+ PTIterationArray *ptchunks; /* sorted lossy page index list */
+ TBMIterateResult output; /* MUST BE LAST (because variable-size) */
+};
/* Local function prototypes */
static void tbm_union_page(TIDBitmap *a, const PagetableEntry *bpage);
static void tbm_mark_page_lossy(TIDBitmap *tbm, BlockNumber pageno);
static void tbm_lossify(TIDBitmap *tbm);
static int tbm_comparator(const void *left, const void *right);
+static int tbm_shared_comparator(const void *left, const void *right,
+ void *arg);
/*
* Simple inline murmur hash implementation for the exact width required, for
}
/* define hashtable mapping block numbers to PagetableEntry's */
+#define SH_USE_NONDEFAULT_ALLOCATOR
#define SH_PREFIX pagetable
#define SH_ELEMENT_TYPE PagetableEntry
#define SH_KEY_TYPE BlockNumber
*
* The bitmap will live in the memory context that is CurrentMemoryContext
* at the time of this call. It will be limited to (approximately) maxbytes
- * total memory consumption.
+ * total memory consumption. If the DSA passed to this function is not NULL
+ * then the memory for storing elements of the underlying page table will
+ * be allocated from the DSA.
*/
TIDBitmap *
-tbm_create(long maxbytes)
+tbm_create(long maxbytes, dsa_area *dsa)
{
TIDBitmap *tbm;
long nbuckets;
nbuckets = Max(nbuckets, 16); /* sanity limit */
tbm->maxentries = (int) nbuckets;
tbm->lossify_start = 0;
+ tbm->dsa = dsa;
return tbm;
}
Assert(tbm->status != TBM_HASH);
Assert(tbm->pagetable == NULL);
- tbm->pagetable = pagetable_create(tbm->mcxt, 128, NULL);
+ tbm->pagetable = pagetable_create(tbm->mcxt, 128, tbm);
/* If entry1 is valid, push it into the hashtable */
if (tbm->status == TBM_ONE_PAGE)
pfree(tbm);
}
+/*
+ * tbm_free_shared_area - free shared state
+ *
+ * Free shared iterator state, Also free shared pagetable and iterator arrays
+ * memory if they are not referred by any of the shared iterator i.e recount
+ * is becomes 0.
+ */
+void
+tbm_free_shared_area(dsa_area *dsa, dsa_pointer dp)
+{
+ TBMSharedIteratorState *istate = dsa_get_address(dsa, dp);
+ PTEntryArray *ptbase = dsa_get_address(dsa, istate->pagetable);
+ PTIterationArray *ptpages;
+ PTIterationArray *ptchunks;
+
+ if (pg_atomic_sub_fetch_u32(&ptbase->refcount, 1) == 0)
+ dsa_free(dsa, istate->pagetable);
+
+ if (istate->spages)
+ {
+ ptpages = dsa_get_address(dsa, istate->spages);
+ if (pg_atomic_sub_fetch_u32(&ptpages->refcount, 1) == 0)
+ dsa_free(dsa, istate->spages);
+ }
+ if (istate->schunks)
+ {
+ ptchunks = dsa_get_address(dsa, istate->schunks);
+ if (pg_atomic_sub_fetch_u32(&ptchunks->refcount, 1) == 0)
+ dsa_free(dsa, istate->schunks);
+ }
+
+ dsa_free(dsa, dp);
+}
+
/*
* tbm_add_tuples - add some tuple IDs to a TIDBitmap
*
PagetableEntry *page = NULL; /* only valid when currblk is valid */
int i;
- Assert(!tbm->iterating);
+ Assert(tbm->iterating == TBM_NOT_ITERATING);
for (i = 0; i < ntids; i++)
{
BlockNumber blk = ItemPointerGetBlockNumber(tids + i);
{
TBMIterator *iterator;
+ Assert(tbm->iterating != TBM_ITERATING_SHARED);
+
/*
* Create the TBMIterator struct, with enough trailing space to serve the
* needs of the TBMIterateResult sub-struct.
* attached to the bitmap not the iterator, so they can be used by more
* than one iterator.
*/
- if (tbm->status == TBM_HASH && !tbm->iterating)
+ if (tbm->status == TBM_HASH && tbm->iterating == TBM_NOT_ITERATING)
{
pagetable_iterator i;
PagetableEntry *page;
tbm_comparator);
}
- tbm->iterating = true;
+ tbm->iterating = TBM_ITERATING_PRIVATE;
return iterator;
}
+/*
+ * tbm_prepare_shared_iterate - prepare shared iteration state for a TIDBitmap.
+ *
+ * The necessary shared state will be allocated from the DSA passed to
+ * tbm_create, so that multiple processes can attach to it and iterate jointly.
+ *
+ * This will convert the pagetable hash into page and chunk array of the index
+ * into pagetable array.
+ */
+dsa_pointer
+tbm_prepare_shared_iterate(TIDBitmap *tbm)
+{
+ dsa_pointer dp;
+ TBMSharedIteratorState *istate;
+ PTEntryArray *ptbase;
+ PTIterationArray *ptpages;
+ PTIterationArray *ptchunks;
+
+ Assert(tbm->dsa != NULL);
+ Assert(tbm->iterating != TBM_ITERATING_PRIVATE);
+
+ /*
+ * Allocate TBMSharedIteratorState from DSA to hold the shared members and
+ * lock, this will also be used by multiple worker for shared iterate.
+ */
+ dp = dsa_allocate(tbm->dsa, sizeof(TBMSharedIteratorState));
+ istate = dsa_get_address(tbm->dsa, dp);
+
+ /*
+ * If we're not already iterating, create and fill the sorted page lists.
+ * (If we are, the sorted page lists are already stored in the TIDBitmap,
+ * and we can just reuse them.)
+ */
+ if (tbm->iterating == TBM_NOT_ITERATING)
+ {
+ pagetable_iterator i;
+ PagetableEntry *page;
+ int idx;
+ int npages;
+ int nchunks;
+
+ /*
+ * Allocate the page and chunk array memory from the DSA to share
+ * across multiple processes.
+ */
+ if (tbm->npages)
+ {
+ tbm->ptpages = dsa_allocate(tbm->dsa, sizeof(PTIterationArray) +
+ tbm->npages * sizeof(int));
+ ptpages = dsa_get_address(tbm->dsa, tbm->ptpages);
+ pg_atomic_init_u32(&ptpages->refcount, 0);
+ }
+ if (tbm->nchunks)
+ {
+ tbm->ptchunks = dsa_allocate(tbm->dsa, sizeof(PTIterationArray) +
+ tbm->nchunks * sizeof(int));
+ ptchunks = dsa_get_address(tbm->dsa, tbm->ptchunks);
+ pg_atomic_init_u32(&ptchunks->refcount, 0);
+ }
+
+ /*
+ * If TBM status is TBM_HASH then iterate over the pagetable and
+ * convert it to page and chunk arrays. But if it's in the
+ * TBM_ONE_PAGE mode then directly allocate the space for one entry
+ * from the DSA.
+ */
+ npages = nchunks = 0;
+ if (tbm->status == TBM_HASH)
+ {
+ ptbase = dsa_get_address(tbm->dsa, tbm->dsapagetable);
+
+ pagetable_start_iterate(tbm->pagetable, &i);
+ while ((page = pagetable_iterate(tbm->pagetable, &i)) != NULL)
+ {
+ idx = page - ptbase->ptentry;
+ if (page->ischunk)
+ ptchunks->index[nchunks++] = idx;
+ else
+ ptpages->index[npages++] = idx;
+ }
+
+ Assert(npages == tbm->npages);
+ Assert(nchunks == tbm->nchunks);
+ }
+ else
+ {
+ /*
+ * In one page mode allocate the space for one pagetable entry and
+ * directly store its index i.e. 0 in page array
+ */
+ tbm->dsapagetable = dsa_allocate(tbm->dsa, sizeof(PTEntryArray) +
+ sizeof(PagetableEntry));
+ ptbase = dsa_get_address(tbm->dsa, tbm->dsapagetable);
+ ptpages->index[0] = 0;
+ }
+
+ pg_atomic_init_u32(&ptbase->refcount, 0);
+
+ if (npages > 1)
+ qsort_arg((void *) (ptpages->index), npages, sizeof(int),
+ tbm_shared_comparator, (void *) ptbase->ptentry);
+ if (nchunks > 1)
+ qsort_arg((void *) (ptchunks->index), nchunks, sizeof(int),
+ tbm_shared_comparator, (void *) ptbase->ptentry);
+ }
+
+ /*
+ * Store the TBM members in the shared state so that we can share them
+ * across multiple processes.
+ */
+ istate->nentries = tbm->nentries;
+ istate->maxentries = tbm->maxentries;
+ istate->npages = tbm->npages;
+ istate->nchunks = tbm->nchunks;
+ istate->pagetable = tbm->dsapagetable;
+ istate->spages = tbm->ptpages;
+ istate->schunks = tbm->ptchunks;
+
+ ptbase = dsa_get_address(tbm->dsa, tbm->dsapagetable);
+ ptpages = dsa_get_address(tbm->dsa, tbm->ptpages);
+ ptchunks = dsa_get_address(tbm->dsa, tbm->ptchunks);
+
+ /*
+ * For every shared iterator, referring to pagetable and iterator array,
+ * increase the refcount by 1 so that while freeing the shared iterator
+ * we don't free pagetable and iterator array until its refcount becomes 0.
+ */
+ pg_atomic_add_fetch_u32(&ptbase->refcount, 1);
+ if (ptpages)
+ pg_atomic_add_fetch_u32(&ptpages->refcount, 1);
+ if (ptchunks)
+ pg_atomic_add_fetch_u32(&ptchunks->refcount, 1);
+
+ /* Initialize the iterator lock */
+ LWLockInitialize(&istate->lock, LWTRANCHE_TBM);
+
+ /* Initialize the shared iterator state */
+ istate->schunkbit = 0;
+ istate->schunkptr = 0;
+ istate->spageptr = 0;
+
+ tbm->iterating = TBM_ITERATING_SHARED;
+
+ return dp;
+}
+
+/*
+ * tbm_extract_page_tuple - extract the tuple offsets from a page
+ *
+ * The extracted offsets are stored into TBMIterateResult.
+ */
+static inline int
+tbm_extract_page_tuple(PagetableEntry *page, TBMIterateResult *output)
+{
+ int wordnum;
+ int ntuples = 0;
+
+ for (wordnum = 0; wordnum < WORDS_PER_PAGE; wordnum++)
+ {
+ bitmapword w = page->words[wordnum];
+
+ if (w != 0)
+ {
+ int off = wordnum * BITS_PER_BITMAPWORD + 1;
+
+ while (w != 0)
+ {
+ if (w & 1)
+ output->offsets[ntuples++] = (OffsetNumber) off;
+ off++;
+ w >>= 1;
+ }
+ }
+ }
+
+ return ntuples;
+}
+
+/*
+ * tbm_advance_schunkbit - Advance the chunkbit
+ */
+static inline void
+tbm_advance_schunkbit(PagetableEntry *chunk, int *schunkbitp)
+{
+ int schunkbit = *schunkbitp;
+
+ while (schunkbit < PAGES_PER_CHUNK)
+ {
+ int wordnum = WORDNUM(schunkbit);
+ int bitnum = BITNUM(schunkbit);
+
+ if ((chunk->words[wordnum] & ((bitmapword) 1 << bitnum)) != 0)
+ break;
+ schunkbit++;
+ }
+
+ *schunkbitp = schunkbit;
+}
+
/*
* tbm_iterate - scan through next page of a TIDBitmap
*
TIDBitmap *tbm = iterator->tbm;
TBMIterateResult *output = &(iterator->output);
- Assert(tbm->iterating);
+ Assert(tbm->iterating == TBM_ITERATING_PRIVATE);
/*
* If lossy chunk pages remain, make sure we've advanced schunkptr/
PagetableEntry *chunk = tbm->schunks[iterator->schunkptr];
int schunkbit = iterator->schunkbit;
- while (schunkbit < PAGES_PER_CHUNK)
- {
- int wordnum = WORDNUM(schunkbit);
- int bitnum = BITNUM(schunkbit);
-
- if ((chunk->words[wordnum] & ((bitmapword) 1 << bitnum)) != 0)
- break;
- schunkbit++;
- }
+ tbm_advance_schunkbit(chunk, &schunkbit);
if (schunkbit < PAGES_PER_CHUNK)
{
iterator->schunkbit = schunkbit;
{
PagetableEntry *page;
int ntuples;
- int wordnum;
/* In ONE_PAGE state, we don't allocate an spages[] array */
if (tbm->status == TBM_ONE_PAGE)
page = tbm->spages[iterator->spageptr];
/* scan bitmap to extract individual offset numbers */
- ntuples = 0;
- for (wordnum = 0; wordnum < WORDS_PER_PAGE; wordnum++)
+ ntuples = tbm_extract_page_tuple(page, output);
+ output->blockno = page->blockno;
+ output->ntuples = ntuples;
+ output->recheck = page->recheck;
+ iterator->spageptr++;
+ return output;
+ }
+
+ /* Nothing more in the bitmap */
+ return NULL;
+}
+
+/*
+ * tbm_shared_iterate - scan through next page of a TIDBitmap
+ *
+ * As above, but this will iterate using an iterator which is shared
+ * across multiple processes. We need to acquire the iterator LWLock,
+ * before accessing the shared members.
+ */
+TBMIterateResult *
+tbm_shared_iterate(TBMSharedIterator *iterator)
+{
+ TBMIterateResult *output = &iterator->output;
+ TBMSharedIteratorState *istate = iterator->state;
+ PagetableEntry *ptbase = iterator->ptbase->ptentry;
+ int *idxpages = iterator->ptpages->index;
+ int *idxchunks = iterator->ptchunks->index;
+
+ /* Acquire the LWLock before accessing the shared members */
+ LWLockAcquire(&istate->lock, LW_EXCLUSIVE);
+
+ /*
+ * If lossy chunk pages remain, make sure we've advanced schunkptr/
+ * schunkbit to the next set bit.
+ */
+ while (istate->schunkptr < istate->nchunks)
+ {
+ PagetableEntry *chunk = &ptbase[idxchunks[istate->schunkptr]];
+ int schunkbit = istate->schunkbit;
+
+ tbm_advance_schunkbit(chunk, &schunkbit);
+ if (schunkbit < PAGES_PER_CHUNK)
{
- bitmapword w = page->words[wordnum];
+ istate->schunkbit = schunkbit;
+ break;
+ }
+ /* advance to next chunk */
+ istate->schunkptr++;
+ istate->schunkbit = 0;
+ }
- if (w != 0)
- {
- int off = wordnum * BITS_PER_BITMAPWORD + 1;
+ /*
+ * If both chunk and per-page data remain, must output the numerically
+ * earlier page.
+ */
+ if (istate->schunkptr < istate->nchunks)
+ {
+ PagetableEntry *chunk = &ptbase[idxchunks[istate->schunkptr]];
+ PagetableEntry *page = &ptbase[idxpages[istate->spageptr]];
+ BlockNumber chunk_blockno;
- while (w != 0)
- {
- if (w & 1)
- output->offsets[ntuples++] = (OffsetNumber) off;
- off++;
- w >>= 1;
- }
- }
+ chunk_blockno = chunk->blockno + istate->schunkbit;
+
+ if (istate->spageptr >= istate->npages ||
+ chunk_blockno < page->blockno)
+ {
+ /* Return a lossy page indicator from the chunk */
+ output->blockno = chunk_blockno;
+ output->ntuples = -1;
+ output->recheck = true;
+ istate->schunkbit++;
+
+ LWLockRelease(&istate->lock);
+ return output;
}
+ }
+
+ if (istate->spageptr < istate->npages)
+ {
+ PagetableEntry *page = &ptbase[idxpages[istate->spageptr]];
+ int ntuples;
+
+ /* scan bitmap to extract individual offset numbers */
+ ntuples = tbm_extract_page_tuple(page, output);
output->blockno = page->blockno;
output->ntuples = ntuples;
output->recheck = page->recheck;
- iterator->spageptr++;
+ istate->spageptr++;
+
+ LWLockRelease(&istate->lock);
+
return output;
}
+ LWLockRelease(&istate->lock);
+
/* Nothing more in the bitmap */
return NULL;
}
pfree(iterator);
}
+/*
+ * tbm_end_shared_iterate - finish a shared iteration over a TIDBitmap
+ *
+ * This doesn't free any of the shared state associated with the iterator,
+ * just our backend-private state.
+ */
+void
+tbm_end_shared_iterate(TBMSharedIterator *iterator)
+{
+ pfree(iterator);
+}
+
/*
* tbm_find_pageentry - find a PagetableEntry for the pageno
*
* push nentries down to significantly less than maxentries, or else we'll
* just end up doing this again very soon. We shoot for maxentries/2.
*/
- Assert(!tbm->iterating);
+ Assert(tbm->iterating == TBM_NOT_ITERATING);
Assert(tbm->status == TBM_HASH);
pagetable_start_iterate_at(tbm->pagetable, &i, tbm->lossify_start);
return 1;
return 0;
}
+
+/*
+ * As above, but this will get index into PagetableEntry array. Therefore,
+ * it needs to get actual PagetableEntry using the index before comparing the
+ * blockno.
+ */
+static int
+tbm_shared_comparator(const void *left, const void *right, void *arg)
+{
+ PagetableEntry *base = (PagetableEntry *) arg;
+ PagetableEntry *lpage = &base[*(int *) left];
+ PagetableEntry *rpage = &base[*(int *) right];
+
+ if (lpage->blockno < rpage->blockno)
+ return -1;
+ else if (lpage->blockno > rpage->blockno)
+ return 1;
+ return 0;
+}
+
+/*
+ * tbm_attach_shared_iterate
+ *
+ * Allocate a backend-private iterator and attach the shared iterator state
+ * to it so that multiple processed can iterate jointly.
+ *
+ * We also converts the DSA pointers to local pointers and store them into
+ * our private iterator.
+ */
+TBMSharedIterator *
+tbm_attach_shared_iterate(dsa_area *dsa, dsa_pointer dp)
+{
+ TBMSharedIterator *iterator;
+ TBMSharedIteratorState *istate;
+
+ /*
+ * Create the TBMSharedIterator struct, with enough trailing space to
+ * serve the needs of the TBMIterateResult sub-struct.
+ */
+ iterator = (TBMSharedIterator *) palloc(sizeof(TBMSharedIterator) +
+ MAX_TUPLES_PER_PAGE * sizeof(OffsetNumber));
+
+ istate = (TBMSharedIteratorState *) dsa_get_address(dsa, dp);
+
+ iterator->state = istate;
+
+ iterator->ptbase = dsa_get_address(dsa, istate->pagetable);
+
+ if (istate->npages)
+ iterator->ptpages = dsa_get_address(dsa, istate->spages);
+ if (istate->nchunks)
+ iterator->ptchunks = dsa_get_address(dsa, istate->schunks);
+
+ return iterator;
+}
+
+/*
+ * pagetable_allocate
+ *
+ * Callback function for allocating the memory for hashtable elements.
+ * Allocate memory for hashtable elements, using DSA if available.
+ */
+static inline void *
+pagetable_allocate(pagetable_hash *pagetable, Size size)
+{
+ TIDBitmap *tbm = (TIDBitmap *) pagetable->private_data;
+ PTEntryArray *ptbase;
+
+ if (tbm->dsa == NULL)
+ return MemoryContextAllocExtended(pagetable->ctx, size,
+ MCXT_ALLOC_HUGE | MCXT_ALLOC_ZERO);
+
+ /*
+ * Save the dsapagetable reference in dsapagetableold before allocating
+ * new memory so that pagetable_free can free the old entry.
+ */
+ tbm->dsapagetableold = tbm->dsapagetable;
+ tbm->dsapagetable = dsa_allocate0(tbm->dsa, sizeof(PTEntryArray) + size);
+
+ ptbase = dsa_get_address(tbm->dsa, tbm->dsapagetable);
+ return ptbase->ptentry;
+}
+
+/*
+ * pagetable_free
+ *
+ * Callback function for freeing hash table elements.
+ */
+static inline void
+pagetable_free(pagetable_hash *pagetable, void *pointer)
+{
+ TIDBitmap *tbm = (TIDBitmap *) pagetable->private_data;
+
+ /* pfree the input pointer if DSA is not available */
+ if (tbm->dsa == NULL)
+ pfree(pointer);
+ else if (DsaPointerIsValid(tbm->dsapagetableold))
+ {
+ dsa_free(tbm->dsa, tbm->dsapagetableold);
+ tbm->dsapagetableold = InvalidDsaPointer;
+ }
+}