COMMIT
(235 rows)
+-- test we can decode "old" tuples bigger than the max heap tuple size correctly
+DROP TABLE IF EXISTS toasted_several;
+NOTICE: table "toasted_several" does not exist, skipping
+CREATE TABLE toasted_several (
+ id serial unique not null,
+ toasted_key text primary key,
+ toasted_col1 text,
+ toasted_col2 text
+);
+ALTER TABLE toasted_several REPLICA IDENTITY FULL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL;
+INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000));
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ regexp_replace
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null
+ COMMIT
+(3 rows)
+
+-- test update of a toasted key without changing it
+UPDATE toasted_several SET toasted_col1 = toasted_key;
+UPDATE toasted_several SET toasted_col2 = toasted_col1;
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ regexp_replace
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null
+ COMMIT
+ BEGIN
+ table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..432109876543210987654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:null
+ COMMIT
+ BEGIN
+ table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210'
+ COMMIT
+(9 rows)
+
+/*
+ * update with large tuplebuf, in a transaction large enough to force to spool to disk
+ */
+BEGIN;
+INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234);
+UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1;
+DELETE FROM toasted_several WHERE id = 1;
+COMMIT;
+DROP TABLE toasted_several;
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
+WHERE data NOT LIKE '%INSERT: %';
+ regexp_replace
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..7654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:unchanged-toast-datum
+ table public.toasted_several: DELETE: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210'
+ COMMIT
+(4 rows)
+
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
203 untoasted200
\.
SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test we can decode "old" tuples bigger than the max heap tuple size correctly
+DROP TABLE IF EXISTS toasted_several;
+CREATE TABLE toasted_several (
+ id serial unique not null,
+ toasted_key text primary key,
+ toasted_col1 text,
+ toasted_col2 text
+);
+ALTER TABLE toasted_several REPLICA IDENTITY FULL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL;
+
+INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000));
+
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test update of a toasted key without changing it
+UPDATE toasted_several SET toasted_col1 = toasted_key;
+UPDATE toasted_several SET toasted_col2 = toasted_col1;
+
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+/*
+ * update with large tuplebuf, in a transaction large enough to force to spool to disk
+ */
+BEGIN;
+INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234);
+UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1;
+DELETE FROM toasted_several WHERE id = 1;
+COMMIT;
+
+DROP TABLE toasted_several;
+
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
+WHERE data NOT LIKE '%INSERT: %';
SELECT pg_drop_replication_slot('regression_slot');
Size tuplelen;
char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple);
}
{
data = XLogRecGetBlockData(r, 0, &datalen);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
}
data = XLogRecGetData(r) + SizeOfHeapUpdate;
datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
}
/* old primary key stored */
if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
{
+ Size len = XLogRecGetDataLen(r) - SizeOfHeapDelete;
+
Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, len);
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
- XLogRecGetDataLen(r) - SizeOfHeapDelete,
- change->data.tp.oldtuple);
+ len, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
*/
if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
{
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ HeapTupleHeader header;
+
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
+ data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+ datalen = xlhdr->datalen;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple;
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
- xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
- data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
- datalen = xlhdr->datalen;
-
/*
* We can only figure this out after reassembling the
* transactions.
*/
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->t_data.header;
+
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
- memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader);
+ memset(header, 0, SizeofHeapTupleHeader);
- memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader,
+ memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
(char *) data,
datalen);
data += datalen;
- tuple->t_data.header.t_infomask = xlhdr->t_infomask;
- tuple->t_data.header.t_infomask2 = xlhdr->t_infomask2;
- tuple->t_data.header.t_hoff = xlhdr->t_hoff;
+ header->t_infomask = xlhdr->t_infomask;
+ header->t_infomask2 = xlhdr->t_infomask2;
+ header->t_hoff = xlhdr->t_hoff;
}
/*
{
xl_heap_header xlhdr;
int datalen = len - SizeOfHeapHeader;
+ HeapTupleHeader header;
Assert(datalen >= 0);
- Assert(datalen <= MaxHeapTupleSize);
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
/* we can only figure this out after reassembling the transactions */
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->t_data.header;
/* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr,
data,
SizeOfHeapHeader);
- memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader);
+ memset(header, 0, SizeofHeapTupleHeader);
- memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader,
+ memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
data + SizeOfHeapHeader,
datalen);
- tuple->t_data.header.t_infomask = xlhdr.t_infomask;
- tuple->t_data.header.t_infomask2 = xlhdr.t_infomask2;
- tuple->t_data.header.t_hoff = xlhdr.t_hoff;
+ header->t_infomask = xlhdr.t_infomask;
+ header->t_infomask2 = xlhdr.t_infomask2;
+ header->t_hoff = xlhdr.t_hoff;
}
/*
- * Get an unused, possibly preallocated, ReorderBufferTupleBuf
+ * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
+ * least a tuple of size tuple_len (excluding header overhead).
*/
ReorderBufferTupleBuf *
-ReorderBufferGetTupleBuf(ReorderBuffer *rb)
+ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
{
ReorderBufferTupleBuf *tuple;
+ Size alloc_len;
- /* check the slab cache */
- if (rb->nr_cached_tuplebufs)
+ alloc_len = tuple_len + SizeofHeapTupleHeader;
+
+ /*
+ * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
+ * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
+ * tuples generated for oldtuples can be bigger, as they don't have
+ * out-of-line toast columns.
+ */
+ if (alloc_len < MaxHeapTupleSize)
+ alloc_len = MaxHeapTupleSize;
+
+
+ /* if small enough, check the slab cache */
+ if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
{
rb->nr_cached_tuplebufs--;
tuple = slist_container(ReorderBufferTupleBuf, node,
slist_pop_head_node(&rb->cached_tuplebufs));
#ifdef USE_ASSERT_CHECKING
- memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf));
+ memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
+#endif
+ tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
+#ifdef USE_ASSERT_CHECKING
+ memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
#endif
}
else
{
tuple = (ReorderBufferTupleBuf *)
- MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf));
+ MemoryContextAlloc(rb->context,
+ sizeof(ReorderBufferTupleBuf) + alloc_len);
+ tuple->alloc_tuple_size = alloc_len;
+ tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
}
return tuple;
void
ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
{
- /* check whether to put into the slab cache */
- if (rb->nr_cached_tuplebufs < max_cached_tuplebufs)
+ /* check whether to put into the slab cache, oversized tuples never are */
+ if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
+ rb->nr_cached_tuplebufs < max_cached_tuplebufs)
{
rb->nr_cached_tuplebufs++;
slist_push_head(&rb->cached_tuplebufs, &tuple->node);
+ VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
+ VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
}
else
{
newtup = change->data.tp.newtuple;
if (oldtup)
- oldlen = offsetof(ReorderBufferTupleBuf, t_data) +
- oldtup->tuple.t_len;
+ {
+ sz += sizeof(HeapTupleData);
+ oldlen = oldtup->tuple.t_len;
+ sz += oldlen;
+ }
if (newtup)
- newlen = offsetof(ReorderBufferTupleBuf, t_data) +
- newtup->tuple.t_len;
-
- sz += oldlen;
- sz += newlen;
+ {
+ sz += sizeof(HeapTupleData);
+ newlen = newtup->tuple.t_len;
+ sz += newlen;
+ }
/* make sure we have enough space */
ReorderBufferSerializeReserve(rb, sz);
if (oldlen)
{
- memcpy(data, oldtup, oldlen);
+ memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, oldtup->tuple.t_data, oldlen);
data += oldlen;
}
if (newlen)
{
- memcpy(data, newtup, newlen);
- data += newlen;
+ memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, newtup->tuple.t_data, newlen);
+ data += oldlen;
}
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
if (change->data.tp.oldtuple)
{
- Size len = offsetof(ReorderBufferTupleBuf, t_data) +
- ((ReorderBufferTupleBuf *) data)->tuple.t_len;
+ Size tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->data.tp.oldtuple, data, len);
+ /* restore ->tuple */
+ memcpy(&change->data.tp.oldtuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ /* reset t_data pointer into the new tuplebuf */
change->data.tp.oldtuple->tuple.t_data =
- &change->data.tp.oldtuple->t_data.header;
- data += len;
+ ReorderBufferTupleBufData(change->data.tp.oldtuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
}
if (change->data.tp.newtuple)
{
- Size len = offsetof(ReorderBufferTupleBuf, t_data) +
- ((ReorderBufferTupleBuf *) data)->tuple.t_len;
+ Size tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->data.tp.newtuple, data, len);
+ /* restore ->tuple */
+ memcpy(&change->data.tp.newtuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ /* reset t_data pointer into the new tuplebuf */
change->data.tp.newtuple->tuple.t_data =
- &change->data.tp.newtuple->t_data.header;
- data += len;
+ ReorderBufferTupleBufData(change->data.tp.newtuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
}
+
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
*/
tmphtup = heap_form_tuple(desc, attrs, isnull);
Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
- Assert(&newtup->t_data.header == newtup->tuple.t_data);
+ Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
newtup->tuple.t_len = tmphtup->t_len;
/* position in preallocated list */
slist_node node;
- /* tuple, stored sequentially */
+ /* tuple header, the interesting bit for users of logical decoding */
HeapTupleData tuple;
- union
- {
- HeapTupleHeaderData header;
- char data[MaxHeapTupleSize];
- double align_it; /* ensure t_data is MAXALIGN'd */
- } t_data;
+
+ /* pre-allocated size of tuple buffer, different from tuple size */
+ Size alloc_tuple_size;
+
+ /* actual tuple data follows */
} ReorderBufferTupleBuf;
+/* pointer to the data stored in a TupleBuf */
+#define ReorderBufferTupleBufData(p) \
+ ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
+
/*
* Types of the change passed to a 'change' callback.
*
ReorderBuffer *ReorderBufferAllocate(void);
void ReorderBufferFree(ReorderBuffer *);
-ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *);
+ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);