static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
TransactionId xid, bool create, bool *is_new,
XLogRecPtr lsn, bool create_as_top);
+static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
+ ReorderBufferTXN *subtxn);
static void AssertTXNLsnOrder(ReorderBuffer *rb);
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
dlist_init(&buffer->toplevel_by_lsn);
+ dlist_init(&buffer->txns_by_base_snapshot_lsn);
/*
* Ensure there's no stale data from prior uses of this slot, in case some
bool found;
Assert(TransactionIdIsValid(xid));
- Assert(!create || lsn != InvalidXLogRecPtr);
/*
* Check the one-entry lookup cache first
{
/* initialize the new entry, if creation was requested */
Assert(ent != NULL);
+ Assert(lsn != InvalidXLogRecPtr);
ent->txn = ReorderBufferGetTXN(rb);
ent->txn->xid = xid;
}
}
-
+/*
+ * AssertTXNLsnOrder
+ * Verify LSN ordering of transaction lists in the reorderbuffer
+ *
+ * Other LSN-related invariants are checked too.
+ *
+ * No-op if assertions are not in use.
+ */
static void
AssertTXNLsnOrder(ReorderBuffer *rb)
{
#ifdef USE_ASSERT_CHECKING
dlist_iter iter;
XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
+ XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
dlist_foreach(iter, &rb->toplevel_by_lsn)
{
- ReorderBufferTXN *cur_txn;
+ ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
+ iter.cur);
- cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
+ /* start LSN must be set */
Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
+ /* If there is an end LSN, it must be higher than start LSN */
if (cur_txn->end_lsn != InvalidXLogRecPtr)
Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
+ /* Current initial LSN must be strictly higher than previous */
if (prev_first_lsn != InvalidXLogRecPtr)
Assert(prev_first_lsn < cur_txn->first_lsn);
+ /* known-as-subtxn txns must not be listed */
Assert(!cur_txn->is_known_as_subxact);
+
prev_first_lsn = cur_txn->first_lsn;
}
+
+ dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
+ {
+ ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
+ base_snapshot_node,
+ iter.cur);
+
+ /* base snapshot (and its LSN) must be set */
+ Assert(cur_txn->base_snapshot != NULL);
+ Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
+
+ /* current LSN must be strictly higher than previous */
+ if (prev_base_snap_lsn != InvalidXLogRecPtr)
+ Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
+
+ /* known-as-subtxn txns must not be listed */
+ Assert(!cur_txn->is_known_as_subxact);
+
+ prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
+ }
#endif
}
+/*
+ * ReorderBufferGetOldestTXN
+ * Return oldest transaction in reorderbuffer
+ */
ReorderBufferTXN *
ReorderBufferGetOldestTXN(ReorderBuffer *rb)
{
ReorderBufferTXN *txn;
+ AssertTXNLsnOrder(rb);
+
if (dlist_is_empty(&rb->toplevel_by_lsn))
return NULL;
- AssertTXNLsnOrder(rb);
-
txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
Assert(!txn->is_known_as_subxact);
return txn;
}
+/*
+ * ReorderBufferGetOldestXmin
+ * Return oldest Xmin in reorderbuffer
+ *
+ * Returns oldest possibly running Xid from the point of view of snapshots
+ * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
+ * there are none.
+ *
+ * Since snapshots are assigned monotonically, this equals the Xmin of the
+ * base snapshot with minimal base_snapshot_lsn.
+ */
+TransactionId
+ReorderBufferGetOldestXmin(ReorderBuffer *rb)
+{
+ ReorderBufferTXN *txn;
+
+ AssertTXNLsnOrder(rb);
+
+ if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
+ return InvalidTransactionId;
+
+ txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
+ &rb->txns_by_base_snapshot_lsn);
+ return txn->base_snapshot->xmin;
+}
+
void
ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
{
rb->current_restart_decoding_lsn = ptr;
}
+/*
+ * ReorderBufferAssignChild
+ *
+ * Make note that we know that subxid is a subtransaction of xid, seen as of
+ * the given lsn.
+ */
void
ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
TransactionId subxid, XLogRecPtr lsn)
txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
- if (new_sub)
+ if (new_top && !new_sub)
+ elog(ERROR, "subtransaction logged without previous top-level txn record");
+
+ if (!new_sub)
{
- /*
- * we assign subtransactions to top level transaction even if we don't
- * have data for it yet, assignment records frequently reference xids
- * that have not yet produced any records. Knowing those aren't top
- * level xids allows us to make processing cheaper in some places.
- */
- dlist_push_tail(&txn->subtxns, &subtxn->node);
- txn->nsubtxns++;
+ if (subtxn->is_known_as_subxact)
+ {
+ /* already associated, nothing to do */
+ return;
+ }
+ else
+ {
+ /*
+ * We already saw this transaction, but initially added it to the list
+ * of top-level txns. Now that we know it's not top-level, remove
+ * it from there.
+ */
+ dlist_delete(&subtxn->node);
+ }
}
- else if (!subtxn->is_known_as_subxact)
- {
- subtxn->is_known_as_subxact = true;
- Assert(subtxn->nsubtxns == 0);
- /* remove from lsn order list of top-level transactions */
- dlist_delete(&subtxn->node);
+ subtxn->is_known_as_subxact = true;
+ subtxn->toplevel_xid = xid;
+ Assert(subtxn->nsubtxns == 0);
- /* add to toplevel transaction */
- dlist_push_tail(&txn->subtxns, &subtxn->node);
- txn->nsubtxns++;
- }
- else if (new_top)
+ /* add to subtransaction list */
+ dlist_push_tail(&txn->subtxns, &subtxn->node);
+ txn->nsubtxns++;
+
+ /* Possibly transfer the subtxn's snapshot to its top-level txn. */
+ ReorderBufferTransferSnapToParent(txn, subtxn);
+
+ /* Verify LSN-ordering invariant */
+ AssertTXNLsnOrder(rb);
+}
+
+/*
+ * ReorderBufferTransferSnapToParent
+ * Transfer base snapshot from subtxn to top-level txn, if needed
+ *
+ * This is done if the top-level txn doesn't have a base snapshot, or if the
+ * subtxn's base snapshot has an earlier LSN than the top-level txn's base
+ * snapshot's LSN. This can happen if there are no changes in the toplevel
+ * txn but there are some in the subtxn, or the first change in subtxn has
+ * earlier LSN than first change in the top-level txn and we learned about
+ * their kinship only now.
+ *
+ * The subtransaction's snapshot is cleared regardless of the transfer
+ * happening, since it's not needed anymore in either case.
+ *
+ * We do this as soon as we become aware of their kinship, to avoid queueing
+ * extra snapshots to txns known-as-subtxns -- only top-level txns will
+ * receive further snapshots.
+ */
+static void
+ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
+ ReorderBufferTXN *subtxn)
+{
+ Assert(subtxn->toplevel_xid == txn->xid);
+
+ if (subtxn->base_snapshot != NULL)
{
- elog(ERROR, "existing subxact assigned to unknown toplevel xact");
+ if (txn->base_snapshot == NULL ||
+ subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
+ {
+ /*
+ * If the toplevel transaction already has a base snapshot but
+ * it's newer than the subxact's, purge it.
+ */
+ if (txn->base_snapshot != NULL)
+ {
+ SnapBuildSnapDecRefcount(txn->base_snapshot);
+ dlist_delete(&txn->base_snapshot_node);
+ }
+
+ /*
+ * The snapshot is now the top transaction's; transfer it, and
+ * adjust the list position of the top transaction in the list by
+ * moving it to where the subtransaction is.
+ */
+ txn->base_snapshot = subtxn->base_snapshot;
+ txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
+ dlist_insert_before(&subtxn->base_snapshot_node,
+ &txn->base_snapshot_node);
+
+ /*
+ * The subtransaction doesn't have a snapshot anymore (so it
+ * mustn't be in the list.)
+ */
+ subtxn->base_snapshot = NULL;
+ subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
+ dlist_delete(&subtxn->base_snapshot_node);
+ }
+ else
+ {
+ /* Base snap of toplevel is fine, so subxact's is not needed */
+ SnapBuildSnapDecRefcount(subtxn->base_snapshot);
+ dlist_delete(&subtxn->base_snapshot_node);
+ subtxn->base_snapshot = NULL;
+ subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
+ }
}
}
TransactionId subxid, XLogRecPtr commit_lsn,
XLogRecPtr end_lsn)
{
- ReorderBufferTXN *txn;
ReorderBufferTXN *subtxn;
subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
if (!subtxn)
return;
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
-
- if (txn == NULL)
- elog(ERROR, "subxact logged without previous toplevel record");
-
- /*
- * Pass our base snapshot to the parent transaction if it doesn't have
- * one, or ours is older. That can happen if there are no changes in the
- * toplevel transaction but in one of the child transactions. This allows
- * the parent to simply use its base snapshot initially.
- */
- if (subtxn->base_snapshot != NULL &&
- (txn->base_snapshot == NULL ||
- txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
- {
- txn->base_snapshot = subtxn->base_snapshot;
- txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
- subtxn->base_snapshot = NULL;
- subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
- }
-
subtxn->final_lsn = commit_lsn;
subtxn->end_lsn = end_lsn;
- if (!subtxn->is_known_as_subxact)
- {
- subtxn->is_known_as_subxact = true;
- Assert(subtxn->nsubtxns == 0);
-
- /* remove from lsn order list of top-level transactions */
- dlist_delete(&subtxn->node);
-
- /* add to subtransaction list */
- dlist_push_tail(&txn->subtxns, &subtxn->node);
- txn->nsubtxns++;
- }
+ /*
+ * Assign this subxact as a child of the toplevel xact (no-op if already
+ * done.)
+ */
+ ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
}
ReorderBufferReturnChange(rb, change);
}
+ /*
+ * Cleanup the base snapshot, if set.
+ */
if (txn->base_snapshot != NULL)
{
SnapBuildSnapDecRefcount(txn->base_snapshot);
- txn->base_snapshot = NULL;
- txn->base_snapshot_lsn = InvalidXLogRecPtr;
+ dlist_delete(&txn->base_snapshot_node);
}
/*
}
/*
- * Perform the replay of a transaction and it's non-aborted subtransactions.
+ * Perform the replay of a transaction and its non-aborted subtransactions.
*
* Subtransactions previously have to be processed by
* ReorderBufferCommitChild(), even if previously assigned to the toplevel
* transaction with ReorderBufferAssignChild.
*
- * We currently can only decode a transaction's contents in when their commit
- * record is read because that's currently the only place where we know about
- * cache invalidations. Thus, once a toplevel commit is read, we iterate over
- * the top and subtransactions (using a k-way merge) and replay the changes in
- * lsn order.
+ * We currently can only decode a transaction's contents when its commit
+ * record is read because that's the only place where we know about cache
+ * invalidations. Thus, once a toplevel commit is read, we iterate over the top
+ * and subtransactions (using a k-way merge) and replay the changes in lsn
+ * order.
*/
void
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
txn->origin_lsn = origin_lsn;
/*
- * If this transaction didn't have any real changes in our database, it's
- * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
- * transferred its snapshot to this transaction if it had one and the
- * toplevel tx didn't.
+ * If this transaction has no snapshot, it didn't make any changes to the
+ * database, so there's nothing to decode. Note that
+ * ReorderBufferCommitChild will have transferred any snapshots from
+ * subtransactions if there were any.
*/
if (txn->base_snapshot == NULL)
{
}
/*
- * Setup the base snapshot of a transaction. The base snapshot is the snapshot
- * that is used to decode all changes until either this transaction modifies
- * the catalog or another catalog modifying transaction commits.
+ * Set up the transaction's base snapshot.
*
- * Needs to be called before any changes are added with
- * ReorderBufferQueueChange().
+ * If we know that xid is a subtransaction, set the base snapshot on the
+ * top-level transaction instead.
*/
void
ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
ReorderBufferTXN *txn;
bool is_new;
+ AssertArg(snap != NULL);
+
+ /*
+ * Fetch the transaction to operate on. If we know it's a subtransaction,
+ * operate on its top-level transaction instead.
+ */
txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
+ if (txn->is_known_as_subxact)
+ txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
+ NULL, InvalidXLogRecPtr, false);
Assert(txn->base_snapshot == NULL);
- Assert(snap != NULL);
txn->base_snapshot = snap;
txn->base_snapshot_lsn = lsn;
+ dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
+
+ AssertTXNLsnOrder(rb);
}
/*
}
/*
- * Have we already added the first snapshot?
+ * ReorderBufferXidHasBaseSnapshot
+ * Have we already set the base snapshot for the given txn/subtxn?
*/
bool
ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
{
ReorderBufferTXN *txn;
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
+ txn = ReorderBufferTXNByXid(rb, xid, false,
+ NULL, InvalidXLogRecPtr, false);
/* transaction isn't known yet, ergo no snapshot */
if (txn == NULL)
return false;
- /*
- * TODO: It would be a nice improvement if we would check the toplevel
- * transaction in subtransactions, but we'd need to keep track of a bit
- * more state.
- */
+ /* a known subtxn? operate on top-level txn instead */
+ if (txn->is_known_as_subxact)
+ txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
+ NULL, InvalidXLogRecPtr, false);
+
return txn->base_snapshot != NULL;
}