#include "utils/relfilenodemap.h"
#include "utils/tqual.h"
-/*
- * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
- * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
- * changes. We don't want to leak those internal values to external users
- * though (they would just use switch()...default:) because that would make it
- * harder to add to new user visible values.
- *
- * This needs to be synchronized with ReorderBufferChangeType! Adjust the
- * StaticAssertExpr's in ReorderBufferAllocate if you add anything!
- */
-typedef enum
-{
- REORDER_BUFFER_CHANGE_INTERNAL_INSERT,
- REORDER_BUFFER_CHANGE_INTERNAL_UPDATE,
- REORDER_BUFFER_CHANGE_INTERNAL_DELETE,
- REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
- REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
- REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
-} ReorderBufferChangeTypeInternal;
-
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
{
HASHCTL hash_ctl;
MemoryContext new_ctx;
- StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_INSERT == (int) REORDER_BUFFER_CHANGE_INSERT, "out of sync enums");
- StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_UPDATE == (int) REORDER_BUFFER_CHANGE_UPDATE, "out of sync enums");
- StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_DELETE == (int) REORDER_BUFFER_CHANGE_DELETE, "out of sync enums");
-
/* allocate memory in own context, to have better accountability */
new_ctx = AllocSetContextCreate(CurrentMemoryContext,
"ReorderBuffer",
ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
{
/* free contained data */
- switch ((ReorderBufferChangeTypeInternal) change->action_internal)
+ switch (change->action)
{
- case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
- case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
- case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
- if (change->tp.newtuple)
+ case REORDER_BUFFER_CHANGE_INSERT:
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ case REORDER_BUFFER_CHANGE_DELETE:
+ if (change->data.tp.newtuple)
{
- ReorderBufferReturnTupleBuf(rb, change->tp.newtuple);
- change->tp.newtuple = NULL;
+ ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
+ change->data.tp.newtuple = NULL;
}
- if (change->tp.oldtuple)
+ if (change->data.tp.oldtuple)
{
- ReorderBufferReturnTupleBuf(rb, change->tp.oldtuple);
- change->tp.oldtuple = NULL;
+ ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
+ change->data.tp.oldtuple = NULL;
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
- if (change->snapshot)
+ if (change->data.snapshot)
{
- ReorderBufferFreeSnap(rb, change->snapshot);
- change->snapshot = NULL;
+ ReorderBufferFreeSnap(rb, change->data.snapshot);
+ change->data.snapshot = NULL;
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
ReorderBufferChange *change;
change = dlist_container(ReorderBufferChange, node, iter.cur);
- Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+ Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
ReorderBufferReturnChange(rb, change);
}
change = dlist_container(ReorderBufferChange, node, iter.cur);
- Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+ Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
/* be careful about padding */
memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
- key.relnode = change->tuplecid.node;
+ key.relnode = change->data.tuplecid.node;
- ItemPointerCopy(&change->tuplecid.tid,
+ ItemPointerCopy(&change->data.tuplecid.tid,
&key.tid);
ent = (ReorderBufferTupleCidEnt *)
&found);
if (!found)
{
- ent->cmin = change->tuplecid.cmin;
- ent->cmax = change->tuplecid.cmax;
- ent->combocid = change->tuplecid.combocid;
+ ent->cmin = change->data.tuplecid.cmin;
+ ent->cmax = change->data.tuplecid.cmax;
+ ent->combocid = change->data.tuplecid.combocid;
}
else
{
- Assert(ent->cmin == change->tuplecid.cmin);
+ Assert(ent->cmin == change->data.tuplecid.cmin);
Assert(ent->cmax == InvalidCommandId ||
- ent->cmax == change->tuplecid.cmax);
+ ent->cmax == change->data.tuplecid.cmax);
/*
* if the tuple got valid in this transaction and now got deleted
* we already have a valid cmin stored. The cmax will be
* InvalidCommandId though.
*/
- ent->cmax = change->tuplecid.cmax;
+ ent->cmax = change->data.tuplecid.cmax;
}
}
}
Relation relation = NULL;
Oid reloid;
- switch ((ReorderBufferChangeTypeInternal) change->action_internal)
+ switch (change->action)
{
- case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
- case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
- case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
+ case REORDER_BUFFER_CHANGE_INSERT:
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ case REORDER_BUFFER_CHANGE_DELETE:
Assert(snapshot_now);
- reloid = RelidByRelfilenode(change->tp.relnode.spcNode,
- change->tp.relnode.relNode);
+ reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+ change->data.tp.relnode.relNode);
/*
* Catalog tuple without data, emitted while catalog was
* in the process of being rewritten.
*/
if (reloid == InvalidOid &&
- change->tp.newtuple == NULL &&
- change->tp.oldtuple == NULL)
+ change->data.tp.newtuple == NULL &&
+ change->data.tp.oldtuple == NULL)
continue;
else if (reloid == InvalidOid)
elog(ERROR, "could not lookup relation %s",
- relpathperm(change->tp.relnode, MAIN_FORKNUM));
+ relpathperm(change->data.tp.relnode, MAIN_FORKNUM));
relation = RelationIdGetRelation(reloid);
if (relation == NULL)
elog(ERROR, "could open relation descriptor %s",
- relpathperm(change->tp.relnode, MAIN_FORKNUM));
+ relpathperm(change->data.tp.relnode, MAIN_FORKNUM));
if (RelationIsLogicallyLogged(relation))
{
{
ReorderBufferFreeSnap(rb, snapshot_now);
snapshot_now =
- ReorderBufferCopySnap(rb, change->snapshot,
+ ReorderBufferCopySnap(rb, change->data.snapshot,
txn, command_id);
}
/*
* free. We could introduce refcounting for that, but for
* now this seems infrequent enough not to care.
*/
- else if (change->snapshot->copied)
+ else if (change->data.snapshot->copied)
{
snapshot_now =
- ReorderBufferCopySnap(rb, change->snapshot,
+ ReorderBufferCopySnap(rb, change->data.snapshot,
txn, command_id);
}
else
{
- snapshot_now = change->snapshot;
+ snapshot_now = change->data.snapshot;
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
- Assert(change->command_id != InvalidCommandId);
+ Assert(change->data.command_id != InvalidCommandId);
- if (command_id < change->command_id)
+ if (command_id < change->data.command_id)
{
- command_id = change->command_id;
+ command_id = change->data.command_id;
if (!snapshot_now->copied)
{
{
ReorderBufferChange *change = ReorderBufferGetChange(rb);
- change->snapshot = snap;
- change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
+ change->data.snapshot = snap;
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
ReorderBufferQueueChange(rb, xid, lsn, change);
}
{
ReorderBufferChange *change = ReorderBufferGetChange(rb);
- change->command_id = cid;
- change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
+ change->data.command_id = cid;
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
ReorderBufferQueueChange(rb, xid, lsn, change);
}
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
- change->tuplecid.node = node;
- change->tuplecid.tid = tid;
- change->tuplecid.cmin = cmin;
- change->tuplecid.cmax = cmax;
- change->tuplecid.combocid = combocid;
+ change->data.tuplecid.node = node;
+ change->data.tuplecid.tid = tid;
+ change->data.tuplecid.cmin = cmin;
+ change->data.tuplecid.cmax = cmax;
+ change->data.tuplecid.combocid = combocid;
change->lsn = lsn;
- change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
dlist_push_tail(&txn->tuplecids, &change->node);
txn->ntuplecids++;
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
- switch ((ReorderBufferChangeTypeInternal) change->action_internal)
+ switch (change->action)
{
- case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
+ case REORDER_BUFFER_CHANGE_INSERT:
/* fall through */
- case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
+ case REORDER_BUFFER_CHANGE_UPDATE:
/* fall through */
- case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
+ case REORDER_BUFFER_CHANGE_DELETE:
{
char *data;
+ ReorderBufferTupleBuf *oldtup, *newtup;
Size oldlen = 0;
Size newlen = 0;
- if (change->tp.oldtuple)
+ oldtup = change->data.tp.oldtuple;
+ newtup = change->data.tp.newtuple;
+
+ if (oldtup)
oldlen = offsetof(ReorderBufferTupleBuf, data)
- + change->tp.oldtuple->tuple.t_len
+ + oldtup->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
- if (change->tp.newtuple)
+ if (newtup)
newlen = offsetof(ReorderBufferTupleBuf, data)
- + change->tp.newtuple->tuple.t_len
+ + newtup->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
sz += oldlen;
if (oldlen)
{
- memcpy(data, change->tp.oldtuple, oldlen);
+ memcpy(data, oldtup, oldlen);
data += oldlen;
- Assert(&change->tp.oldtuple->header == change->tp.oldtuple->tuple.t_data);
}
if (newlen)
{
- memcpy(data, change->tp.newtuple, newlen);
+ memcpy(data, newtup, newlen);
data += newlen;
- Assert(&change->tp.newtuple->header == change->tp.newtuple->tuple.t_data);
}
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
+ Snapshot snap;
char *data;
+ snap = change->data.snapshot;
+
sz += sizeof(SnapshotData) +
- sizeof(TransactionId) * change->snapshot->xcnt +
- sizeof(TransactionId) * change->snapshot->subxcnt
+ sizeof(TransactionId) * snap->xcnt +
+ sizeof(TransactionId) * snap->subxcnt
;
/* make sure we have enough space */
/* might have been reallocated above */
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
- memcpy(data, change->snapshot, sizeof(SnapshotData));
+ memcpy(data, snap, sizeof(SnapshotData));
data += sizeof(SnapshotData);
- if (change->snapshot->xcnt)
+ if (snap->xcnt)
{
- memcpy(data, change->snapshot->xip,
- sizeof(TransactionId) + change->snapshot->xcnt);
- data += sizeof(TransactionId) + change->snapshot->xcnt;
+ memcpy(data, snap->xip,
+ sizeof(TransactionId) + snap->xcnt);
+ data += sizeof(TransactionId) + snap->xcnt;
}
- if (change->snapshot->subxcnt)
+ if (snap->subxcnt)
{
- memcpy(data, change->snapshot->subxip,
- sizeof(TransactionId) + change->snapshot->subxcnt);
- data += sizeof(TransactionId) + change->snapshot->subxcnt;
+ memcpy(data, snap->subxip,
+ sizeof(TransactionId) + snap->subxcnt);
+ data += sizeof(TransactionId) + snap->subxcnt;
}
break;
}
txn->xid)));
}
- Assert(ondisk->change.action_internal == change->action_internal);
+ Assert(ondisk->change.action == change->action);
}
/*
data += sizeof(ReorderBufferDiskChange);
/* restore individual stuff */
- switch ((ReorderBufferChangeTypeInternal) change->action_internal)
+ switch (change->action)
{
- case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
+ case REORDER_BUFFER_CHANGE_INSERT:
/* fall through */
- case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
+ case REORDER_BUFFER_CHANGE_UPDATE:
/* fall through */
- case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
- if (change->tp.newtuple)
+ case REORDER_BUFFER_CHANGE_DELETE:
+ if (change->data.tp.newtuple)
{
Size len = offsetof(ReorderBufferTupleBuf, data)
+((ReorderBufferTupleBuf *) data)->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
- change->tp.newtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->tp.newtuple, data, len);
- change->tp.newtuple->tuple.t_data = &change->tp.newtuple->header;
-
+ change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb);
+ memcpy(change->data.tp.newtuple, data, len);
+ change->data.tp.newtuple->tuple.t_data =
+ &change->data.tp.newtuple->header;
data += len;
}
- if (change->tp.oldtuple)
+ if (change->data.tp.oldtuple)
{
Size len = offsetof(ReorderBufferTupleBuf, data)
+((ReorderBufferTupleBuf *) data)->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
- change->tp.oldtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->tp.oldtuple, data, len);
- change->tp.oldtuple->tuple.t_data = &change->tp.oldtuple->header;
+ change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb);
+ memcpy(change->data.tp.oldtuple, data, len);
+ change->data.tp.oldtuple->tuple.t_data =
+ &change->data.tp.oldtuple->header;
data += len;
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
- Snapshot oldsnap = (Snapshot) data;
- Size size = sizeof(SnapshotData) +
- sizeof(TransactionId) * oldsnap->xcnt +
- sizeof(TransactionId) * (oldsnap->subxcnt + 0)
- ;
-
- Assert(change->snapshot != NULL);
-
- change->snapshot = MemoryContextAllocZero(rb->context, size);
-
- memcpy(change->snapshot, data, size);
- change->snapshot->xip = (TransactionId *)
- (((char *) change->snapshot) + sizeof(SnapshotData));
- change->snapshot->subxip =
- change->snapshot->xip + change->snapshot->xcnt + 0;
- change->snapshot->copied = true;
+ Snapshot oldsnap;
+ Snapshot newsnap;
+ Size size;
+
+ oldsnap = (Snapshot) data;
+
+ size = sizeof(SnapshotData) +
+ sizeof(TransactionId) * oldsnap->xcnt +
+ sizeof(TransactionId) * (oldsnap->subxcnt + 0);
+
+ change->data.snapshot = MemoryContextAllocZero(rb->context, size);
+
+ newsnap = change->data.snapshot;
+
+ memcpy(newsnap, data, size);
+ newsnap->xip = (TransactionId *)
+ (((char *) newsnap) + sizeof(SnapshotData));
+ newsnap->subxip = newsnap->xip + newsnap->xcnt;
+ newsnap->copied = true;
break;
}
/* the base struct contains all the data, easy peasy */
Relation relation, ReorderBufferChange *change)
{
ReorderBufferToastEnt *ent;
+ ReorderBufferTupleBuf *newtup;
bool found;
int32 chunksize;
bool isnull;
Assert(IsToastRelation(relation));
- chunk_id = DatumGetObjectId(fastgetattr(&change->tp.newtuple->tuple, 1, desc, &isnull));
+ newtup = change->data.tp.newtuple;
+ chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
Assert(!isnull);
- chunk_seq = DatumGetInt32(fastgetattr(&change->tp.newtuple->tuple, 2, desc, &isnull));
+ chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
Assert(!isnull);
ent = (ReorderBufferToastEnt *)
elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
chunk_seq, chunk_id, ent->last_chunk_seq + 1);
- chunk = DatumGetPointer(fastgetattr(&change->tp.newtuple->tuple, 3, desc, &isnull));
+ chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
Assert(!isnull);
/* calculate size so we can allocate the right size at once later */
Datum *attrs;
bool *isnull;
bool *free;
- HeapTuple newtup;
+ HeapTuple tmphtup;
Relation toast_rel;
TupleDesc toast_desc;
MemoryContext oldcontext;
+ ReorderBufferTupleBuf *newtup;
/* no toast tuples changed */
if (txn->toast_hash == NULL)
oldcontext = MemoryContextSwitchTo(rb->context);
/* we should only have toast tuples in an INSERT or UPDATE */
- Assert(change->tp.newtuple);
+ Assert(change->data.tp.newtuple);
desc = RelationGetDescr(relation);
isnull = palloc0(sizeof(bool) * desc->natts);
free = palloc0(sizeof(bool) * desc->natts);
- heap_deform_tuple(&change->tp.newtuple->tuple, desc,
- attrs, isnull);
+ newtup = change->data.tp.newtuple;
+
+ heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
for (natt = 0; natt < desc->natts; natt++)
{
dlist_foreach(it, &ent->chunks)
{
bool isnull;
- ReorderBufferTupleBuf *tup =
- dlist_container(ReorderBufferChange, node, it.cur)->tp.newtuple;
- Pointer chunk =
- DatumGetPointer(fastgetattr(&tup->tuple, 3, toast_desc, &isnull));
+ ReorderBufferChange *cchange;
+ ReorderBufferTupleBuf *ctup;
+ Pointer chunk;
+
+ cchange = dlist_container(ReorderBufferChange, node, it.cur);
+ ctup = cchange->data.tp.newtuple;
+ chunk = DatumGetPointer(
+ fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
Assert(!isnull);
Assert(!VARATT_IS_EXTERNAL(chunk));
* passed to the output plugin. We can't directly heap_fill_tuple() into
* the tuplebuf because attrs[] will point back into the current content.
*/
- newtup = heap_form_tuple(desc, attrs, isnull);
- Assert(change->tp.newtuple->tuple.t_len <= MaxHeapTupleSize);
- Assert(&change->tp.newtuple->header == change->tp.newtuple->tuple.t_data);
+ tmphtup = heap_form_tuple(desc, attrs, isnull);
+ Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
+ Assert(&newtup->header == newtup->tuple.t_data);
- memcpy(change->tp.newtuple->tuple.t_data,
- newtup->t_data,
- newtup->t_len);
- change->tp.newtuple->tuple.t_len = newtup->t_len;
+ memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
+ newtup->tuple.t_len = tmphtup->t_len;
/*
* free resources we won't further need, more persistent stuff will be
* free'd in ReorderBufferToastReset().
*/
RelationClose(toast_rel);
- pfree(newtup);
+ pfree(tmphtup);
for (natt = 0; natt < desc->natts; natt++)
{
if (free[natt])