/*-------------------------------------------------------------------------
* txid.c
*
- * Export backend internal tranasction id's to user level.
+ * Export internal transaction IDs to user level.
+ *
+ * Note that only top-level transaction IDs are ever converted to TXID.
+ * This is important because TXIDs frequently persist beyond the global
+ * xmin horizon, or may even be shipped to other machines, so we cannot
+ * rely on being able to correlate subtransaction IDs with their parents
+ * via functions such as SubTransGetTopmostTransaction().
+ *
*
* Copyright (c) 2003-2007, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
- *
* 64-bit txids: Marko Kreen, Skype Technologies
+ *
+ * $PostgreSQL: pgsql/contrib/txid/txid.c,v 1.4 2007/10/11 19:54:17 tgl Exp $
+ *
*-------------------------------------------------------------------------
*/
#include "access/transam.h"
#include "access/xact.h"
#include "funcapi.h"
+#include "libpq/pqformat.h"
+
-#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
-#endif
#ifdef INT64_IS_BUSTED
#error txid needs working int64
#endif
-/* txid will be signed int8 in database */
+/* txid will be signed int8 in database, so must limit to 63 bits */
#define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)
+/* Use unsigned variant internally */
+typedef uint64 txid;
+
+/* sprintf format code for uint64 */
+#define TXID_FMT UINT64_FORMAT
+
/*
- * If defined, use bsearch() function for searching
- * txid's inside snapshots that have more than given values.
+ * If defined, use bsearch() function for searching for txids in snapshots
+ * that have more than the specified number of values.
*/
#define USE_BSEARCH_IF_NXIP_GREATER 30
-/* format code for uint64 to appendStringInfo */
-#define TXID_FMT UINT64_FORMAT
-
-/* Use unsigned variant internally */
-typedef uint64 txid;
/*
- * Snapshot for 8byte txids.
+ * Snapshot containing 8byte txids.
*/
typedef struct
{
uint32 nxip; /* number of txids in xip array */
txid xmin;
txid xmax;
- txid xip[1]; /* in-progress txids */
+ txid xip[1]; /* in-progress txids, xmin <= xip[i] < xmax */
} TxidSnapshot;
-#define TXID_SNAPSHOT_SIZE(nxip) (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
+#define TXID_SNAPSHOT_SIZE(nxip) \
+ (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
/*
- * Epoch values from backend.
+ * Epoch values from xact.c
*/
-typedef struct {
- uint64 last_value;
- uint64 epoch;
+typedef struct
+{
+ TransactionId last_xid;
+ uint32 epoch;
} TxidEpoch;
+
/* public functions */
Datum txid_snapshot_in(PG_FUNCTION_ARGS);
Datum txid_snapshot_out(PG_FUNCTION_ARGS);
+Datum txid_snapshot_recv(PG_FUNCTION_ARGS);
+Datum txid_snapshot_send(PG_FUNCTION_ARGS);
Datum txid_current(PG_FUNCTION_ARGS);
Datum txid_current_snapshot(PG_FUNCTION_ARGS);
Datum txid_snapshot_xmin(PG_FUNCTION_ARGS);
/* public function tags */
PG_FUNCTION_INFO_V1(txid_snapshot_in);
PG_FUNCTION_INFO_V1(txid_snapshot_out);
+PG_FUNCTION_INFO_V1(txid_snapshot_recv);
+PG_FUNCTION_INFO_V1(txid_snapshot_send);
PG_FUNCTION_INFO_V1(txid_current);
PG_FUNCTION_INFO_V1(txid_current_snapshot);
PG_FUNCTION_INFO_V1(txid_snapshot_xmin);
PG_FUNCTION_INFO_V1(txid_snapshot_xip);
PG_FUNCTION_INFO_V1(txid_visible_in_snapshot);
+
/*
- * do a TransactionId -> txid conversion
+ * Fetch epoch data from xact.c.
*/
-static txid
-convert_xid(TransactionId xid, const TxidEpoch *state)
+static void
+load_xid_epoch(TxidEpoch *state)
{
- uint64 epoch;
-
- /* return special xid's as-is */
- if (xid < FirstNormalTransactionId)
- return xid;
-
- /* xid can on both sides on wrap-around */
- epoch = state->epoch;
- if (TransactionIdPrecedes(xid, state->last_value)) {
- if (xid > state->last_value)
- epoch--;
- } else if (TransactionIdFollows(xid, state->last_value)) {
- if (xid < state->last_value)
- epoch++;
- }
- return (epoch << 32) | xid;
+ GetNextXidAndEpoch(&state->last_xid, &state->epoch);
}
/*
- * Fetch epoch data from backend.
+ * do a TransactionId -> txid conversion for an XID near the given epoch
*/
-static void
-load_xid_epoch(TxidEpoch *state)
+static txid
+convert_xid(TransactionId xid, const TxidEpoch *state)
{
- TransactionId xid;
- uint32 epoch;
+ uint64 epoch;
- GetNextXidAndEpoch(&xid, &epoch);
+ /* return special xid's as-is */
+ if (!TransactionIdIsNormal(xid))
+ return (txid) xid;
+
+ /* xid can be on either side when near wrap-around */
+ epoch = (uint64) state->epoch;
+ if (xid > state->last_xid &&
+ TransactionIdPrecedes(xid, state->last_xid))
+ epoch--;
+ else if (xid < state->last_xid &&
+ TransactionIdFollows(xid, state->last_xid))
+ epoch++;
- state->epoch = epoch;
- state->last_value = xid;
+ return (epoch << 32) | xid;
}
/*
- * compare txid in memory.
+ * txid comparator for qsort/bsearch
*/
static int
cmp_txid(const void *aa, const void *bb)
{
- const uint64 *a = aa;
- const uint64 *b = bb;
- if (*a < *b)
+ txid a = *(const txid *) aa;
+ txid b = *(const txid *) bb;
+
+ if (a < b)
return -1;
- if (*a > *b)
+ if (a > b)
return 1;
return 0;
}
/*
- * order txids, for bsearch().
+ * sort a snapshot's txids, so we can use bsearch() later.
+ *
+ * For consistency of on-disk representation, we always sort even if bsearch
+ * will not be used.
*/
static void
sort_snapshot(TxidSnapshot *snap)
else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
{
void *res;
+
res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
+ /* if found, transaction is still in progress */
return (res) ? false : true;
}
#endif
else
{
- int i;
+ uint32 i;
+
for (i = 0; i < snap->nxip; i++)
{
if (value == snap->xip[i])
{
TxidSnapshot *snap = (TxidSnapshot *)buf->data;
- /* do it before possible realloc */
+ /* do this before possible realloc */
snap->nxip++;
appendBinaryStringInfo(buf, (char *)&xid, sizeof(xid));
buf_finalize(StringInfo buf)
{
TxidSnapshot *snap = (TxidSnapshot *)buf->data;
+
SET_VARSIZE(snap, buf->len);
/* buf is not needed anymore */
}
/*
- * Public functions
+ * Public functions.
+ *
+ * txid_current() and txid_current_snapshot() are the only ones that
+ * communicate with core xid machinery. All the others work on data
+ * returned by them.
*/
/*
* txid_current() returns int8
*
- * Return the current transaction ID
+ * Return the current toplevel transaction ID as TXID
*/
Datum
txid_current(PG_FUNCTION_ARGS)
/*
* txid_current_snapshot() returns txid_snapshot
*
- * Return current snapshot
+ * Return current snapshot in TXID format
+ *
+ * Note that only top-transaction XIDs are included in the snapshot.
*/
Datum
txid_current_snapshot(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap;
- unsigned nxip, i, size;
+ uint32 nxip, i, size;
TxidEpoch state;
Snapshot cur;
- cur = SerializableSnapshot;
+ cur = ActiveSnapshot;
if (cur == NULL)
- elog(ERROR, "get_current_snapshot: SerializableSnapshot == NULL");
+ elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL");
load_xid_epoch(&state);
for (i = 0; i < nxip; i++)
snap->xip[i] = convert_xid(cur->xip[i], &state);
- /* we want them guaranteed ascending order */
+ /* we want them guaranteed to be in ascending order */
sort_snapshot(snap);
PG_RETURN_POINTER(snap);
Datum
txid_snapshot_in(PG_FUNCTION_ARGS)
{
- TxidSnapshot *snap;
char *str = PG_GETARG_CSTRING(0);
+ TxidSnapshot *snap;
snap = parse_snapshot(str);
Datum
txid_snapshot_out(PG_FUNCTION_ARGS)
{
- TxidSnapshot *snap;
+ TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
StringInfoData str;
- int i;
-
- snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
+ uint32 i;
initStringInfo(&str);
for (i = 0; i < snap->nxip; i++)
{
- appendStringInfo(&str, "%s" TXID_FMT,
- ((i > 0) ? "," : ""),
- snap->xip[i]);
+ if (i > 0)
+ appendStringInfoChar(&str, ',');
+ appendStringInfo(&str, TXID_FMT, snap->xip[i]);
}
- PG_FREE_IF_COPY(snap, 0);
-
PG_RETURN_CSTRING(str.data);
}
+/*
+ * txid_snapshot_recv(internal) returns txid_snapshot
+ *
+ * binary input function for type txid_snapshot
+ *
+ * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
+ */
+Datum
+txid_snapshot_recv(PG_FUNCTION_ARGS)
+{
+ StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
+ TxidSnapshot *snap;
+ txid last = 0;
+ int nxip;
+ int i;
+ int avail;
+ int expect;
+ txid xmin, xmax;
+
+ /*
+ * load nxip and check for nonsense.
+ *
+ * (nxip > avail) check is against int overflows in 'expect'.
+ */
+ nxip = pq_getmsgint(buf, 4);
+ avail = buf->len - buf->cursor;
+ expect = 8 + 8 + nxip * 8;
+ if (nxip < 0 || nxip > avail || expect > avail)
+ goto bad_format;
+
+ xmin = pq_getmsgint64(buf);
+ xmax = pq_getmsgint64(buf);
+ if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
+ goto bad_format;
+
+ snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
+ snap->xmin = xmin;
+ snap->xmax = xmax;
+ snap->nxip = nxip;
+ SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
+
+ for (i = 0; i < nxip; i++)
+ {
+ txid cur = pq_getmsgint64(buf);
+ if (cur <= last || cur < xmin || cur >= xmax)
+ goto bad_format;
+ snap->xip[i] = cur;
+ last = cur;
+ }
+ PG_RETURN_POINTER(snap);
+
+bad_format:
+ elog(ERROR, "invalid snapshot data");
+ return (Datum)NULL;
+}
+
+/*
+ * txid_snapshot_send(txid_snapshot) returns bytea
+ *
+ * binary output function for type txid_snapshot
+ *
+ * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
+ */
+Datum
+txid_snapshot_send(PG_FUNCTION_ARGS)
+{
+ TxidSnapshot *snap = (TxidSnapshot *)PG_GETARG_VARLENA_P(0);
+ StringInfoData buf;
+ uint32 i;
+
+ pq_begintypsend(&buf);
+ pq_sendint(&buf, snap->nxip, 4);
+ pq_sendint64(&buf, snap->xmin);
+ pq_sendint64(&buf, snap->xmax);
+ for (i = 0; i < snap->nxip; i++)
+ pq_sendint64(&buf, snap->xip[i]);
+ PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
+}
/*
* txid_visible_in_snapshot(int8, txid_snapshot) returns bool
{
txid value = PG_GETARG_INT64(0);
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
- int res;
- res = is_visible_txid(value, snap) ? true : false;
-
- PG_FREE_IF_COPY(snap, 1);
- PG_RETURN_BOOL(res);
+ PG_RETURN_BOOL(is_visible_txid(value, snap));
}
/*
txid_snapshot_xmin(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
- txid res = snap->xmin;
- PG_FREE_IF_COPY(snap, 0);
- PG_RETURN_INT64(res);
+
+ PG_RETURN_INT64(snap->xmin);
}
/*
txid_snapshot_xmax(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
- txid res = snap->xmax;
- PG_FREE_IF_COPY(snap, 0);
- PG_RETURN_INT64(res);
+
+ PG_RETURN_INT64(snap->xmax);
}
/*
/* on first call initialize snap_state and get copy of snapshot */
if (SRF_IS_FIRSTCALL()) {
- TxidSnapshot *arg;
+ TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
fctx = SRF_FIRSTCALL_INIT();
/* make a copy of user snapshot */
- arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
memcpy(snap, arg, VARSIZE(arg));
- PG_FREE_IF_COPY(arg, 0);
fctx->user_fctx = snap;
}
SRF_RETURN_DONE(fctx);
}
}
-
--
-- Copyright (c) 2003-2007, PostgreSQL Global Development Group
-- Author: Jan Wieck, Afilias USA INC.
---
-- 64-bit txids: Marko Kreen, Skype Technologies
+--
+-- $PostgreSQL: pgsql/contrib/txid/txid.sql.in,v 1.2 2007/10/11 19:54:17 tgl Exp $
+--
-- ----------
+-- Adjust this setting to control where the objects get created.
+SET search_path = public;
+
+BEGIN;
+
--
-- A special transaction snapshot data type for faster visibility checks
--
-CREATE OR REPLACE FUNCTION txid_snapshot_in(cstring)
+CREATE TYPE txid_snapshot;
+
+CREATE FUNCTION txid_snapshot_in(cstring)
RETURNS txid_snapshot
AS 'MODULE_PATHNAME' LANGUAGE C
IMMUTABLE STRICT;
-CREATE OR REPLACE FUNCTION txid_snapshot_out(txid_snapshot)
+CREATE FUNCTION txid_snapshot_out(txid_snapshot)
RETURNS cstring
AS 'MODULE_PATHNAME' LANGUAGE C
IMMUTABLE STRICT;
+CREATE FUNCTION txid_snapshot_recv(internal)
+ RETURNS txid_snapshot
+ AS 'MODULE_PATHNAME' LANGUAGE C
+ IMMUTABLE STRICT;
+CREATE FUNCTION txid_snapshot_send(txid_snapshot)
+ RETURNS bytea
+ AS 'MODULE_PATHNAME' LANGUAGE C
+ IMMUTABLE STRICT;
--
-- The data type itself
CREATE TYPE txid_snapshot (
INPUT = txid_snapshot_in,
OUTPUT = txid_snapshot_out,
+ RECEIVE = txid_snapshot_recv,
+ SEND = txid_snapshot_send,
INTERNALLENGTH = variable,
STORAGE = extended,
ALIGNMENT = double
);
-CREATE OR REPLACE FUNCTION txid_current()
+--
+-- Functions for txid
+--
+CREATE FUNCTION txid_current()
RETURNS bigint
AS 'MODULE_PATHNAME', 'txid_current' LANGUAGE C
STABLE;
-CREATE OR REPLACE FUNCTION txid_current_snapshot()
+CREATE FUNCTION txid_current_snapshot()
RETURNS txid_snapshot
AS 'MODULE_PATHNAME', 'txid_current_snapshot' LANGUAGE C
STABLE;
-CREATE OR REPLACE FUNCTION txid_snapshot_xmin(txid_snapshot)
+CREATE FUNCTION txid_snapshot_xmin(txid_snapshot)
RETURNS bigint
AS 'MODULE_PATHNAME', 'txid_snapshot_xmin' LANGUAGE C
IMMUTABLE STRICT;
-CREATE OR REPLACE FUNCTION txid_snapshot_xmax(txid_snapshot)
+CREATE FUNCTION txid_snapshot_xmax(txid_snapshot)
RETURNS bigint
AS 'MODULE_PATHNAME', 'txid_snapshot_xmax' LANGUAGE C
IMMUTABLE STRICT;
-CREATE OR REPLACE FUNCTION txid_snapshot_xip(txid_snapshot)
+CREATE FUNCTION txid_snapshot_xip(txid_snapshot)
RETURNS setof bigint
AS 'MODULE_PATHNAME', 'txid_snapshot_xip' LANGUAGE C
IMMUTABLE STRICT;
-
---
--- Special comparision functions for visibility checks
---
-CREATE OR REPLACE FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot)
+CREATE FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot)
RETURNS boolean
AS 'MODULE_PATHNAME', 'txid_visible_in_snapshot' LANGUAGE C
IMMUTABLE STRICT;
+COMMIT;