if (ItemIdIsUsed(itemId))
{
dtup = brin_deform_tuple(bdesc,
- (BrinTuple *) PageGetItem(page, itemId));
+ (BrinTuple *) PageGetItem(page, itemId),
+ NULL);
attno = 1;
unusedItem = false;
}
MemoryContextSwitchTo(tupcxt);
}
- dtup = brin_deform_tuple(bdesc, brtup);
+ dtup = brin_deform_tuple(bdesc, brtup, NULL);
/*
* Compare the key values of the new tuple to the stored index values;
* re-acquiring the lock.
*/
origsz = ItemIdGetLength(lp);
- origtup = brin_copy_tuple(brtup, origsz);
+ origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
/*
* Before releasing the lock, check if we can attempt a same-page
FmgrInfo *consistentFn;
MemoryContext oldcxt;
MemoryContext perRangeCxt;
+ BrinMemTuple *dtup;
+ BrinTuple *btup = NULL;
+ Size btupsz = 0;
opaque = (BrinOpaque *) scan->opaque;
bdesc = opaque->bo_bdesc;
*/
consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
+ /* allocate an initial in-memory tuple, out of the per-range memcxt */
+ dtup = brin_new_memtuple(bdesc);
+
/*
* Setup and use a per-range memory context, which is reset every time we
* loop below. This avoids having to free the tuples within the loop.
for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
{
bool addrange;
+ bool gottuple = false;
BrinTuple *tup;
OffsetNumber off;
Size size;
scan->xs_snapshot);
if (tup)
{
- tup = brin_copy_tuple(tup, size);
+ gottuple = true;
+ btup = brin_copy_tuple(tup, size, btup, &btupsz);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
}
* For page ranges with no indexed tuple, we must return the whole
* range; otherwise, compare it to the scan keys.
*/
- if (tup == NULL)
+ if (!gottuple)
{
addrange = true;
}
else
{
- BrinMemTuple *dtup;
-
- dtup = brin_deform_tuple(bdesc, tup);
+ dtup = brin_deform_tuple(bdesc, btup, dtup);
if (dtup->bt_placeholder)
{
/*
/* the placeholder tuple must exist */
if (phtup == NULL)
elog(ERROR, "missing placeholder tuple");
- phtup = brin_copy_tuple(phtup, phsz);
+ phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
/* merge it into the tuple from the heap scan */
"brin union",
ALLOCSET_DEFAULT_SIZES);
oldcxt = MemoryContextSwitchTo(cxt);
- db = brin_deform_tuple(bdesc, b);
+ db = brin_deform_tuple(bdesc, b, NULL);
MemoryContextSwitchTo(oldcxt);
for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
OffsetNumber off;
OffsetNumber maxoff;
Page page;
+ BrinTuple *btup = NULL;
+ Size btupsz = 0;
page = BufferGetPage(buf);
{
sz = ItemIdGetLength(lp);
tup = (BrinTuple *) PageGetItem(page, lp);
- tup = brin_copy_tuple(tup, sz);
+ tup = brin_copy_tuple(tup, sz, btup, &btupsz);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
}
/*
- * Create a palloc'd copy of a BrinTuple.
+ * Given a brin tuple of size len, create a copy of it. If 'dest' is not
+ * NULL, its size is destsz, and can be used as output buffer; if the tuple
+ * to be copied does not fit, it is enlarged by repalloc, and the size is
+ * updated to match. This avoids palloc/free cycles when many brin tuples
+ * are being processed in loops.
*/
BrinTuple *
-brin_copy_tuple(BrinTuple *tuple, Size len)
+brin_copy_tuple(BrinTuple *tuple, Size len, BrinTuple *dest, Size *destsz)
{
- BrinTuple *newtup;
+ if (!destsz || *destsz == 0)
+ dest = palloc(len);
+ else if (len > *destsz)
+ {
+ dest = repalloc(dest, len);
+ *destsz = len;
+ }
- newtup = palloc(len);
- memcpy(newtup, tuple, len);
+ memcpy(dest, tuple, len);
- return newtup;
+ return dest;
}
/*
brin_new_memtuple(BrinDesc *brdesc)
{
BrinMemTuple *dtup;
- char *currdatum;
long basesize;
- int i;
basesize = MAXALIGN(sizeof(BrinMemTuple) +
sizeof(BrinValues) * brdesc->bd_tupdesc->natts);
dtup = palloc0(basesize + sizeof(Datum) * brdesc->bd_totalstored);
- currdatum = (char *) dtup + basesize;
- for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
- {
- dtup->bt_columns[i].bv_attno = i + 1;
- dtup->bt_columns[i].bv_allnulls = true;
- dtup->bt_columns[i].bv_hasnulls = false;
- dtup->bt_columns[i].bv_values = (Datum *) currdatum;
- currdatum += sizeof(Datum) * brdesc->bd_info[i]->oi_nstored;
- }
+
+ dtup->bt_values = palloc(sizeof(Datum) * brdesc->bd_totalstored);
+ dtup->bt_allnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
+ dtup->bt_hasnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
dtup->bt_context = AllocSetContextCreate(CurrentMemoryContext,
"brin dtuple",
ALLOCSET_DEFAULT_SIZES);
+
+ brin_memtuple_initialize(dtup, brdesc);
+
return dtup;
}
/*
- * Reset a BrinMemTuple to initial state
+ * Reset a BrinMemTuple to initial state. We return the same tuple, for
+ * notational convenience.
*/
-void
+BrinMemTuple *
brin_memtuple_initialize(BrinMemTuple *dtuple, BrinDesc *brdesc)
{
int i;
+ char *currdatum;
MemoryContextReset(dtuple->bt_context);
+
+ currdatum = (char *) dtuple +
+ MAXALIGN(sizeof(BrinMemTuple) +
+ sizeof(BrinValues) * brdesc->bd_tupdesc->natts);
for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
{
dtuple->bt_columns[i].bv_allnulls = true;
dtuple->bt_columns[i].bv_hasnulls = false;
+
+ dtuple->bt_columns[i].bv_attno = i + 1;
+ dtuple->bt_columns[i].bv_allnulls = true;
+ dtuple->bt_columns[i].bv_hasnulls = false;
+ dtuple->bt_columns[i].bv_values = (Datum *) currdatum;
+ currdatum += sizeof(Datum) * brdesc->bd_info[i]->oi_nstored;
}
+
+ return dtuple;
}
/*
* Convert a BrinTuple back to a BrinMemTuple. This is the reverse of
* brin_form_tuple.
*
+ * As an optimization, the caller can pass a previously allocated 'dMemtuple'.
+ * This avoids having to allocate it here, which can be useful when this
+ * function is called many times in a loop. It is caller's responsibility
+ * that the given BrinMemTuple matches what we need here.
+ *
* Note we don't need the "on disk tupdesc" here; we rely on our own routine to
* deconstruct the tuple from the on-disk format.
*/
BrinMemTuple *
-brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
+brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple, BrinMemTuple *dMemtuple)
{
BrinMemTuple *dtup;
Datum *values;
int valueno;
MemoryContext oldcxt;
- dtup = brin_new_memtuple(brdesc);
+ dtup = dMemtuple ? brin_memtuple_initialize(dMemtuple, brdesc) :
+ brin_new_memtuple(brdesc);
if (BrinTupleIsPlaceholder(tuple))
dtup->bt_placeholder = true;
dtup->bt_blkno = tuple->bt_blkno;
- values = palloc(sizeof(Datum) * brdesc->bd_totalstored);
- allnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
- hasnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
+ values = dtup->bt_values;
+ allnulls = dtup->bt_allnulls;
+ hasnulls = dtup->bt_hasnulls;
tp = (char *) tuple + BrinTupleDataOffset(tuple);
MemoryContextSwitchTo(oldcxt);
- pfree(values);
- pfree(allnulls);
- pfree(hasnulls);
-
return dtup;
}
bool bt_placeholder; /* this is a placeholder tuple */
BlockNumber bt_blkno; /* heap blkno that the tuple is for */
MemoryContext bt_context; /* memcxt holding the bt_columns values */
+ /* output arrays for brin_deform_tuple: */
+ Datum *bt_values; /* values array */
+ bool *bt_allnulls; /* allnulls array */
+ bool *bt_hasnulls; /* hasnulls array */
+ /* not an output array, but must be last */
BrinValues bt_columns[FLEXIBLE_ARRAY_MEMBER];
} BrinMemTuple;
extern BrinTuple *brin_form_placeholder_tuple(BrinDesc *brdesc,
BlockNumber blkno, Size *size);
extern void brin_free_tuple(BrinTuple *tuple);
-extern BrinTuple *brin_copy_tuple(BrinTuple *tuple, Size len);
+extern BrinTuple *brin_copy_tuple(BrinTuple *tuple, Size len,
+ BrinTuple *dest, Size *destsz);
extern bool brin_tuples_equal(const BrinTuple *a, Size alen,
const BrinTuple *b, Size blen);
extern BrinMemTuple *brin_new_memtuple(BrinDesc *brdesc);
-extern void brin_memtuple_initialize(BrinMemTuple *dtuple,
+extern BrinMemTuple *brin_memtuple_initialize(BrinMemTuple *dtuple,
BrinDesc *brdesc);
extern BrinMemTuple *brin_deform_tuple(BrinDesc *brdesc,
- BrinTuple *tuple);
+ BrinTuple *tuple, BrinMemTuple *dMemtuple);
#endif /* BRIN_TUPLE_H */