From d77717bae779facb9515a38d1c20422fa9fa9d9a Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Thu, 11 Oct 2007 19:54:17 +0000 Subject: [PATCH] Code review for txid patch: add binary I/O functions, avoid dependence on SerializableSnapshot, minor other cleanup. Marko Kreen, some further editorialization by me. --- contrib/txid/Makefile | 10 +- contrib/txid/README.txid | 27 ++-- contrib/txid/txid.c | 263 +++++++++++++++++++++----------- contrib/txid/txid.sql.in | 45 ++++-- contrib/txid/uninstall_txid.sql | 10 +- 5 files changed, 224 insertions(+), 131 deletions(-) diff --git a/contrib/txid/Makefile b/contrib/txid/Makefile index 5e356535f5..2d71ee7269 100644 --- a/contrib/txid/Makefile +++ b/contrib/txid/Makefile @@ -1,3 +1,4 @@ +# $PostgreSQL: pgsql/contrib/txid/Makefile,v 1.2 2007/10/11 19:54:17 tgl Exp $ MODULES = txid DATA_built = txid.sql @@ -5,7 +6,6 @@ DATA = uninstall_txid.sql DOCS = README.txid REGRESS = txid - ifdef USE_PGXS PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) @@ -16,11 +16,3 @@ top_builddir = ../.. include $(top_builddir)/src/Makefile.global include $(top_srcdir)/contrib/contrib-global.mk endif - - -test: install - $(MAKE) installcheck || { less regression.diffs; exit 1; } - -ack: - cp results/* expected/ - diff --git a/contrib/txid/README.txid b/contrib/txid/README.txid index 03d805d3ce..7b39235bf3 100644 --- a/contrib/txid/README.txid +++ b/contrib/txid/README.txid @@ -1,8 +1,7 @@ +txid - export transaction IDs to user level +=========================================== -txid - export transaction id's to user level -============================================ - -The goal is to make PostgreSQL internal transaction ID and snapshot +The goal is to make PostgreSQL's internal transaction ID and snapshot data usable externally. This allows very efficient queue implementation done inside database. @@ -32,7 +31,7 @@ txid_snapshot_xmax( snap ) returns int8 txid_snapshot_xip( snap ) setof int8 List of in-progress TXID's in snapshot, that are invisible. - Values are between xmin and xmax. + Values are between xmin (inclusive) and xmax (exclusive). txid_visible_in_snapshot(id, snap) returns bool @@ -73,8 +72,8 @@ fetching possible txids below snap1.xmax explicitly: AND NOT txid_visible_in_snapshot(ev_txid, :snap1) AND txid_visible_in_snapshot(ev_txid, :snap2); -Note that although the above queries work, the PostgreSQL fails to -plan them correctly. For actual usage the values for txid_snapshot_xmin, +Note that although the above queries work, PostgreSQL fails to +plan them efficiently. For actual usage the values for txid_snapshot_xmin, txid_snapshot_xmax and txid_snapshot_xip should be filled in directly, only then will they use index. @@ -92,20 +91,16 @@ To see example code for that it's best to see pgq.batch_event_sql() function in Dumping and restoring data containing TXIDs. -------------------------------------------- -[towrite: reason for epoch increase] - -You can look at current epoch with query: - - SELECT txid_current() >> 32 as epoch; +When reloading TXID data you will typically want to be sure that the current +XID counter is beyond the reloaded data. The easiest way to do this is to +increase the XID epoch to beyond the largest one in the input data. -So new epoch should be: +You can look at current epoch with queries such as: - SELECT (txid_current() >> 32) + 1 as newepoch; + SELECT MAX(txid) >> 32 as epoch FROM ...; Epoch can be changed with pg_resetxlog command: pg_resetxlog -e NEWEPOCH DATADIR Database needs to be shut down for that moment. - - diff --git a/contrib/txid/txid.c b/contrib/txid/txid.c index 015e1fb936..9a89a0631a 100644 --- a/contrib/txid/txid.c +++ b/contrib/txid/txid.c @@ -1,12 +1,21 @@ /*------------------------------------------------------------------------- * txid.c * - * Export backend internal tranasction id's to user level. + * Export internal transaction IDs to user level. + * + * Note that only top-level transaction IDs are ever converted to TXID. + * This is important because TXIDs frequently persist beyond the global + * xmin horizon, or may even be shipped to other machines, so we cannot + * rely on being able to correlate subtransaction IDs with their parents + * via functions such as SubTransGetTopmostTransaction(). + * * * Copyright (c) 2003-2007, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. - * * 64-bit txids: Marko Kreen, Skype Technologies + * + * $PostgreSQL: pgsql/contrib/txid/txid.c,v 1.4 2007/10/11 19:54:17 tgl Exp $ + * *------------------------------------------------------------------------- */ @@ -15,32 +24,33 @@ #include "access/transam.h" #include "access/xact.h" #include "funcapi.h" +#include "libpq/pqformat.h" + -#ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; -#endif #ifdef INT64_IS_BUSTED #error txid needs working int64 #endif -/* txid will be signed int8 in database */ +/* txid will be signed int8 in database, so must limit to 63 bits */ #define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF) +/* Use unsigned variant internally */ +typedef uint64 txid; + +/* sprintf format code for uint64 */ +#define TXID_FMT UINT64_FORMAT + /* - * If defined, use bsearch() function for searching - * txid's inside snapshots that have more than given values. + * If defined, use bsearch() function for searching for txids in snapshots + * that have more than the specified number of values. */ #define USE_BSEARCH_IF_NXIP_GREATER 30 -/* format code for uint64 to appendStringInfo */ -#define TXID_FMT UINT64_FORMAT - -/* Use unsigned variant internally */ -typedef uint64 txid; /* - * Snapshot for 8byte txids. + * Snapshot containing 8byte txids. */ typedef struct { @@ -55,22 +65,27 @@ typedef struct uint32 nxip; /* number of txids in xip array */ txid xmin; txid xmax; - txid xip[1]; /* in-progress txids */ + txid xip[1]; /* in-progress txids, xmin <= xip[i] < xmax */ } TxidSnapshot; -#define TXID_SNAPSHOT_SIZE(nxip) (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip)) +#define TXID_SNAPSHOT_SIZE(nxip) \ + (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip)) /* - * Epoch values from backend. + * Epoch values from xact.c */ -typedef struct { - uint64 last_value; - uint64 epoch; +typedef struct +{ + TransactionId last_xid; + uint32 epoch; } TxidEpoch; + /* public functions */ Datum txid_snapshot_in(PG_FUNCTION_ARGS); Datum txid_snapshot_out(PG_FUNCTION_ARGS); +Datum txid_snapshot_recv(PG_FUNCTION_ARGS); +Datum txid_snapshot_send(PG_FUNCTION_ARGS); Datum txid_current(PG_FUNCTION_ARGS); Datum txid_current_snapshot(PG_FUNCTION_ARGS); Datum txid_snapshot_xmin(PG_FUNCTION_ARGS); @@ -81,6 +96,8 @@ Datum txid_visible_in_snapshot(PG_FUNCTION_ARGS); /* public function tags */ PG_FUNCTION_INFO_V1(txid_snapshot_in); PG_FUNCTION_INFO_V1(txid_snapshot_out); +PG_FUNCTION_INFO_V1(txid_snapshot_recv); +PG_FUNCTION_INFO_V1(txid_snapshot_send); PG_FUNCTION_INFO_V1(txid_current); PG_FUNCTION_INFO_V1(txid_current_snapshot); PG_FUNCTION_INFO_V1(txid_snapshot_xmin); @@ -88,62 +105,61 @@ PG_FUNCTION_INFO_V1(txid_snapshot_xmax); PG_FUNCTION_INFO_V1(txid_snapshot_xip); PG_FUNCTION_INFO_V1(txid_visible_in_snapshot); + /* - * do a TransactionId -> txid conversion + * Fetch epoch data from xact.c. */ -static txid -convert_xid(TransactionId xid, const TxidEpoch *state) +static void +load_xid_epoch(TxidEpoch *state) { - uint64 epoch; - - /* return special xid's as-is */ - if (xid < FirstNormalTransactionId) - return xid; - - /* xid can on both sides on wrap-around */ - epoch = state->epoch; - if (TransactionIdPrecedes(xid, state->last_value)) { - if (xid > state->last_value) - epoch--; - } else if (TransactionIdFollows(xid, state->last_value)) { - if (xid < state->last_value) - epoch++; - } - return (epoch << 32) | xid; + GetNextXidAndEpoch(&state->last_xid, &state->epoch); } /* - * Fetch epoch data from backend. + * do a TransactionId -> txid conversion for an XID near the given epoch */ -static void -load_xid_epoch(TxidEpoch *state) +static txid +convert_xid(TransactionId xid, const TxidEpoch *state) { - TransactionId xid; - uint32 epoch; + uint64 epoch; - GetNextXidAndEpoch(&xid, &epoch); + /* return special xid's as-is */ + if (!TransactionIdIsNormal(xid)) + return (txid) xid; + + /* xid can be on either side when near wrap-around */ + epoch = (uint64) state->epoch; + if (xid > state->last_xid && + TransactionIdPrecedes(xid, state->last_xid)) + epoch--; + else if (xid < state->last_xid && + TransactionIdFollows(xid, state->last_xid)) + epoch++; - state->epoch = epoch; - state->last_value = xid; + return (epoch << 32) | xid; } /* - * compare txid in memory. + * txid comparator for qsort/bsearch */ static int cmp_txid(const void *aa, const void *bb) { - const uint64 *a = aa; - const uint64 *b = bb; - if (*a < *b) + txid a = *(const txid *) aa; + txid b = *(const txid *) bb; + + if (a < b) return -1; - if (*a > *b) + if (a > b) return 1; return 0; } /* - * order txids, for bsearch(). + * sort a snapshot's txids, so we can use bsearch() later. + * + * For consistency of on-disk representation, we always sort even if bsearch + * will not be used. */ static void sort_snapshot(TxidSnapshot *snap) @@ -166,13 +182,16 @@ is_visible_txid(txid value, const TxidSnapshot *snap) else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER) { void *res; + res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid); + /* if found, transaction is still in progress */ return (res) ? false : true; } #endif else { - int i; + uint32 i; + for (i = 0; i < snap->nxip; i++) { if (value == snap->xip[i]) @@ -206,7 +225,7 @@ buf_add_txid(StringInfo buf, txid xid) { TxidSnapshot *snap = (TxidSnapshot *)buf->data; - /* do it before possible realloc */ + /* do this before possible realloc */ snap->nxip++; appendBinaryStringInfo(buf, (char *)&xid, sizeof(xid)); @@ -216,6 +235,7 @@ static TxidSnapshot * buf_finalize(StringInfo buf) { TxidSnapshot *snap = (TxidSnapshot *)buf->data; + SET_VARSIZE(snap, buf->len); /* buf is not needed anymore */ @@ -319,13 +339,17 @@ bad_format: } /* - * Public functions + * Public functions. + * + * txid_current() and txid_current_snapshot() are the only ones that + * communicate with core xid machinery. All the others work on data + * returned by them. */ /* * txid_current() returns int8 * - * Return the current transaction ID + * Return the current toplevel transaction ID as TXID */ Datum txid_current(PG_FUNCTION_ARGS) @@ -343,19 +367,21 @@ txid_current(PG_FUNCTION_ARGS) /* * txid_current_snapshot() returns txid_snapshot * - * Return current snapshot + * Return current snapshot in TXID format + * + * Note that only top-transaction XIDs are included in the snapshot. */ Datum txid_current_snapshot(PG_FUNCTION_ARGS) { TxidSnapshot *snap; - unsigned nxip, i, size; + uint32 nxip, i, size; TxidEpoch state; Snapshot cur; - cur = SerializableSnapshot; + cur = ActiveSnapshot; if (cur == NULL) - elog(ERROR, "get_current_snapshot: SerializableSnapshot == NULL"); + elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL"); load_xid_epoch(&state); @@ -372,7 +398,7 @@ txid_current_snapshot(PG_FUNCTION_ARGS) for (i = 0; i < nxip; i++) snap->xip[i] = convert_xid(cur->xip[i], &state); - /* we want them guaranteed ascending order */ + /* we want them guaranteed to be in ascending order */ sort_snapshot(snap); PG_RETURN_POINTER(snap); @@ -386,8 +412,8 @@ txid_current_snapshot(PG_FUNCTION_ARGS) Datum txid_snapshot_in(PG_FUNCTION_ARGS) { - TxidSnapshot *snap; char *str = PG_GETARG_CSTRING(0); + TxidSnapshot *snap; snap = parse_snapshot(str); @@ -402,11 +428,9 @@ txid_snapshot_in(PG_FUNCTION_ARGS) Datum txid_snapshot_out(PG_FUNCTION_ARGS) { - TxidSnapshot *snap; + TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); StringInfoData str; - int i; - - snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); + uint32 i; initStringInfo(&str); @@ -415,16 +439,92 @@ txid_snapshot_out(PG_FUNCTION_ARGS) for (i = 0; i < snap->nxip; i++) { - appendStringInfo(&str, "%s" TXID_FMT, - ((i > 0) ? "," : ""), - snap->xip[i]); + if (i > 0) + appendStringInfoChar(&str, ','); + appendStringInfo(&str, TXID_FMT, snap->xip[i]); } - PG_FREE_IF_COPY(snap, 0); - PG_RETURN_CSTRING(str.data); } +/* + * txid_snapshot_recv(internal) returns txid_snapshot + * + * binary input function for type txid_snapshot + * + * format: int4 nxip, int8 xmin, int8 xmax, int8 xip + */ +Datum +txid_snapshot_recv(PG_FUNCTION_ARGS) +{ + StringInfo buf = (StringInfo) PG_GETARG_POINTER(0); + TxidSnapshot *snap; + txid last = 0; + int nxip; + int i; + int avail; + int expect; + txid xmin, xmax; + + /* + * load nxip and check for nonsense. + * + * (nxip > avail) check is against int overflows in 'expect'. + */ + nxip = pq_getmsgint(buf, 4); + avail = buf->len - buf->cursor; + expect = 8 + 8 + nxip * 8; + if (nxip < 0 || nxip > avail || expect > avail) + goto bad_format; + + xmin = pq_getmsgint64(buf); + xmax = pq_getmsgint64(buf); + if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID) + goto bad_format; + + snap = palloc(TXID_SNAPSHOT_SIZE(nxip)); + snap->xmin = xmin; + snap->xmax = xmax; + snap->nxip = nxip; + SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip)); + + for (i = 0; i < nxip; i++) + { + txid cur = pq_getmsgint64(buf); + if (cur <= last || cur < xmin || cur >= xmax) + goto bad_format; + snap->xip[i] = cur; + last = cur; + } + PG_RETURN_POINTER(snap); + +bad_format: + elog(ERROR, "invalid snapshot data"); + return (Datum)NULL; +} + +/* + * txid_snapshot_send(txid_snapshot) returns bytea + * + * binary output function for type txid_snapshot + * + * format: int4 nxip, int8 xmin, int8 xmax, int8 xip + */ +Datum +txid_snapshot_send(PG_FUNCTION_ARGS) +{ + TxidSnapshot *snap = (TxidSnapshot *)PG_GETARG_VARLENA_P(0); + StringInfoData buf; + uint32 i; + + pq_begintypsend(&buf); + pq_sendint(&buf, snap->nxip, 4); + pq_sendint64(&buf, snap->xmin); + pq_sendint64(&buf, snap->xmax); + for (i = 0; i < snap->nxip; i++) + pq_sendint64(&buf, snap->xip[i]); + PG_RETURN_BYTEA_P(pq_endtypsend(&buf)); +} /* * txid_visible_in_snapshot(int8, txid_snapshot) returns bool @@ -436,12 +536,8 @@ txid_visible_in_snapshot(PG_FUNCTION_ARGS) { txid value = PG_GETARG_INT64(0); TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1); - int res; - res = is_visible_txid(value, snap) ? true : false; - - PG_FREE_IF_COPY(snap, 1); - PG_RETURN_BOOL(res); + PG_RETURN_BOOL(is_visible_txid(value, snap)); } /* @@ -453,9 +549,8 @@ Datum txid_snapshot_xmin(PG_FUNCTION_ARGS) { TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); - txid res = snap->xmin; - PG_FREE_IF_COPY(snap, 0); - PG_RETURN_INT64(res); + + PG_RETURN_INT64(snap->xmin); } /* @@ -467,9 +562,8 @@ Datum txid_snapshot_xmax(PG_FUNCTION_ARGS) { TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); - txid res = snap->xmax; - PG_FREE_IF_COPY(snap, 0); - PG_RETURN_INT64(res); + + PG_RETURN_INT64(snap->xmax); } /* @@ -486,15 +580,13 @@ txid_snapshot_xip(PG_FUNCTION_ARGS) /* on first call initialize snap_state and get copy of snapshot */ if (SRF_IS_FIRSTCALL()) { - TxidSnapshot *arg; + TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); fctx = SRF_FIRSTCALL_INIT(); /* make a copy of user snapshot */ - arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg)); memcpy(snap, arg, VARSIZE(arg)); - PG_FREE_IF_COPY(arg, 0); fctx->user_fctx = snap; } @@ -509,4 +601,3 @@ txid_snapshot_xip(PG_FUNCTION_ARGS) SRF_RETURN_DONE(fctx); } } - diff --git a/contrib/txid/txid.sql.in b/contrib/txid/txid.sql.in index 3c36dc2745..ce5cded738 100644 --- a/contrib/txid/txid.sql.in +++ b/contrib/txid/txid.sql.in @@ -5,21 +5,38 @@ -- -- Copyright (c) 2003-2007, PostgreSQL Global Development Group -- Author: Jan Wieck, Afilias USA INC. --- -- 64-bit txids: Marko Kreen, Skype Technologies +-- +-- $PostgreSQL: pgsql/contrib/txid/txid.sql.in,v 1.2 2007/10/11 19:54:17 tgl Exp $ +-- -- ---------- +-- Adjust this setting to control where the objects get created. +SET search_path = public; + +BEGIN; + -- -- A special transaction snapshot data type for faster visibility checks -- -CREATE OR REPLACE FUNCTION txid_snapshot_in(cstring) +CREATE TYPE txid_snapshot; + +CREATE FUNCTION txid_snapshot_in(cstring) RETURNS txid_snapshot AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT; -CREATE OR REPLACE FUNCTION txid_snapshot_out(txid_snapshot) +CREATE FUNCTION txid_snapshot_out(txid_snapshot) RETURNS cstring AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT; +CREATE FUNCTION txid_snapshot_recv(internal) + RETURNS txid_snapshot + AS 'MODULE_PATHNAME' LANGUAGE C + IMMUTABLE STRICT; +CREATE FUNCTION txid_snapshot_send(txid_snapshot) + RETURNS bytea + AS 'MODULE_PATHNAME' LANGUAGE C + IMMUTABLE STRICT; -- -- The data type itself @@ -27,42 +44,44 @@ CREATE OR REPLACE FUNCTION txid_snapshot_out(txid_snapshot) CREATE TYPE txid_snapshot ( INPUT = txid_snapshot_in, OUTPUT = txid_snapshot_out, + RECEIVE = txid_snapshot_recv, + SEND = txid_snapshot_send, INTERNALLENGTH = variable, STORAGE = extended, ALIGNMENT = double ); -CREATE OR REPLACE FUNCTION txid_current() +-- +-- Functions for txid +-- +CREATE FUNCTION txid_current() RETURNS bigint AS 'MODULE_PATHNAME', 'txid_current' LANGUAGE C STABLE; -CREATE OR REPLACE FUNCTION txid_current_snapshot() +CREATE FUNCTION txid_current_snapshot() RETURNS txid_snapshot AS 'MODULE_PATHNAME', 'txid_current_snapshot' LANGUAGE C STABLE; -CREATE OR REPLACE FUNCTION txid_snapshot_xmin(txid_snapshot) +CREATE FUNCTION txid_snapshot_xmin(txid_snapshot) RETURNS bigint AS 'MODULE_PATHNAME', 'txid_snapshot_xmin' LANGUAGE C IMMUTABLE STRICT; -CREATE OR REPLACE FUNCTION txid_snapshot_xmax(txid_snapshot) +CREATE FUNCTION txid_snapshot_xmax(txid_snapshot) RETURNS bigint AS 'MODULE_PATHNAME', 'txid_snapshot_xmax' LANGUAGE C IMMUTABLE STRICT; -CREATE OR REPLACE FUNCTION txid_snapshot_xip(txid_snapshot) +CREATE FUNCTION txid_snapshot_xip(txid_snapshot) RETURNS setof bigint AS 'MODULE_PATHNAME', 'txid_snapshot_xip' LANGUAGE C IMMUTABLE STRICT; - --- --- Special comparision functions for visibility checks --- -CREATE OR REPLACE FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot) +CREATE FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot) RETURNS boolean AS 'MODULE_PATHNAME', 'txid_visible_in_snapshot' LANGUAGE C IMMUTABLE STRICT; +COMMIT; diff --git a/contrib/txid/uninstall_txid.sql b/contrib/txid/uninstall_txid.sql index a28da29845..ac126d60e0 100644 --- a/contrib/txid/uninstall_txid.sql +++ b/contrib/txid/uninstall_txid.sql @@ -1,4 +1,4 @@ - +SET search_path = public; DROP FUNCTION txid_current(); DROP FUNCTION txid_current_snapshot(); @@ -6,10 +6,6 @@ DROP FUNCTION txid_snapshot_xmin(txid_snapshot); DROP FUNCTION txid_snapshot_xmax(txid_snapshot); DROP FUNCTION txid_snapshot_xip(txid_snapshot); DROP FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot); -DROP FUNCTION txid_not_visible_in_snapshot(bigint, txid_snapshot); - -DROP TYPE txid_snapshot cascade; --- need cascade to drop those: --- DROP FUNCTION txid_snapshot_in(cstring); --- DROP FUNCTION txid_snapshot_out(txid_snapshot); +DROP TYPE txid_snapshot CASCADE; +-- need cascade to drop the I/O functions -- 2.40.0