1 /*-------------------------------------------------------------------------
4 * Export internal transaction IDs to user level.
6 * Note that only top-level transaction IDs are ever converted to TXID.
7 * This is important because TXIDs frequently persist beyond the global
8 * xmin horizon, or may even be shipped to other machines, so we cannot
9 * rely on being able to correlate subtransaction IDs with their parents
10 * via functions such as SubTransGetTopmostTransaction().
13 * Copyright (c) 2003-2007, PostgreSQL Global Development Group
14 * Author: Jan Wieck, Afilias USA INC.
15 * 64-bit txids: Marko Kreen, Skype Technologies
17 * $PostgreSQL: pgsql/src/backend/utils/adt/txid.c,v 1.3 2007/11/15 22:25:16 momjian Exp $
19 *-------------------------------------------------------------------------
24 #include "access/transam.h"
25 #include "access/xact.h"
27 #include "libpq/pqformat.h"
28 #include "utils/builtins.h"
31 #ifndef INT64_IS_BUSTED
32 /* txid will be signed int8 in database, so must limit to 63 bits */
33 #define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)
35 /* we only really have 32 bits to work with :-( */
36 #define MAX_TXID UINT64CONST(0x7FFFFFFF)
39 /* Use unsigned variant internally */
42 /* sprintf format code for uint64 */
43 #define TXID_FMT UINT64_FORMAT
46 * If defined, use bsearch() function for searching for txids in snapshots
47 * that have more than the specified number of values.
49 #define USE_BSEARCH_IF_NXIP_GREATER 30
53 * Snapshot containing 8byte txids.
58 * 4-byte length hdr, should not be touched directly.
60 * Explicit embedding is ok as we want always correct alignment anyway.
64 uint32 nxip; /* number of txids in xip array */
67 txid xip[1]; /* in-progress txids, xmin <= xip[i] < xmax */
70 #define TXID_SNAPSHOT_SIZE(nxip) \
71 (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
74 * Epoch values from xact.c
78 TransactionId last_xid;
84 * Fetch epoch data from xact.c.
87 load_xid_epoch(TxidEpoch *state)
89 GetNextXidAndEpoch(&state->last_xid, &state->epoch);
93 * do a TransactionId -> txid conversion for an XID near the given epoch
96 convert_xid(TransactionId xid, const TxidEpoch *state)
98 #ifndef INT64_IS_BUSTED
101 /* return special xid's as-is */
102 if (!TransactionIdIsNormal(xid))
105 /* xid can be on either side when near wrap-around */
106 epoch = (uint64) state->epoch;
107 if (xid > state->last_xid &&
108 TransactionIdPrecedes(xid, state->last_xid))
110 else if (xid < state->last_xid &&
111 TransactionIdFollows(xid, state->last_xid))
114 return (epoch << 32) | xid;
115 #else /* INT64_IS_BUSTED */
116 /* we can't do anything with the epoch, so ignore it */
117 return (txid) xid & MAX_TXID;
118 #endif /* INT64_IS_BUSTED */
122 * txid comparator for qsort/bsearch
125 cmp_txid(const void *aa, const void *bb)
127 txid a = *(const txid *) aa;
128 txid b = *(const txid *) bb;
138 * sort a snapshot's txids, so we can use bsearch() later.
140 * For consistency of on-disk representation, we always sort even if bsearch
144 sort_snapshot(TxidSnapshot *snap)
147 qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
151 * check txid visibility.
154 is_visible_txid(txid value, const TxidSnapshot *snap)
156 if (value < snap->xmin)
158 else if (value >= snap->xmax)
160 #ifdef USE_BSEARCH_IF_NXIP_GREATER
161 else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
165 res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
166 /* if found, transaction is still in progress */
167 return (res) ? false : true;
174 for (i = 0; i < snap->nxip; i++)
176 if (value == snap->xip[i])
184 * helper functions to use StringInfo for TxidSnapshot creation.
188 buf_init(txid xmin, txid xmax)
197 buf = makeStringInfo();
198 appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
203 buf_add_txid(StringInfo buf, txid xid)
205 TxidSnapshot *snap = (TxidSnapshot *) buf->data;
207 /* do this before possible realloc */
210 appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
213 static TxidSnapshot *
214 buf_finalize(StringInfo buf)
216 TxidSnapshot *snap = (TxidSnapshot *) buf->data;
218 SET_VARSIZE(snap, buf->len);
220 /* buf is not needed anymore */
228 * simple number parser.
230 * We return 0 on error, which is invalid value for txid.
233 str2txid(const char *s, const char **endp)
236 txid cutoff = MAX_TXID / 10;
237 txid cutlim = MAX_TXID % 10;
243 if (*s < '0' || *s > '9')
250 if (val > cutoff || (val == cutoff && d > cutlim))
264 * parse snapshot from cstring
266 static TxidSnapshot *
267 parse_snapshot(const char *str)
273 const char *str_start = str;
277 xmin = str2txid(str, &endp);
282 xmax = str2txid(str, &endp);
287 /* it should look sane */
288 if (xmin == 0 || xmax == 0 || xmin > xmax)
291 /* allocate buffer */
292 buf = buf_init(xmin, xmax);
294 /* loop over values */
297 /* read next value */
298 val = str2txid(str, &endp);
301 /* require the input to be in order */
302 if (val < xmin || val >= xmax || val <= last_val)
305 buf_add_txid(buf, val);
310 else if (*str != '\0')
314 return buf_finalize(buf);
317 elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
324 * txid_current() and txid_current_snapshot() are the only ones that
325 * communicate with core xid machinery. All the others work on data
330 * txid_current() returns int8
332 * Return the current toplevel transaction ID as TXID
335 txid_current(PG_FUNCTION_ARGS)
340 load_xid_epoch(&state);
342 val = convert_xid(GetTopTransactionId(), &state);
344 PG_RETURN_INT64(val);
348 * txid_current_snapshot() returns txid_snapshot
350 * Return current snapshot in TXID format
352 * Note that only top-transaction XIDs are included in the snapshot.
355 txid_current_snapshot(PG_FUNCTION_ARGS)
364 cur = ActiveSnapshot;
366 elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL");
368 load_xid_epoch(&state);
372 size = TXID_SNAPSHOT_SIZE(nxip);
374 SET_VARSIZE(snap, size);
377 snap->xmin = convert_xid(cur->xmin, &state);
378 snap->xmax = convert_xid(cur->xmax, &state);
380 for (i = 0; i < nxip; i++)
381 snap->xip[i] = convert_xid(cur->xip[i], &state);
383 /* we want them guaranteed to be in ascending order */
386 PG_RETURN_POINTER(snap);
390 * txid_snapshot_in(cstring) returns txid_snapshot
392 * input function for type txid_snapshot
395 txid_snapshot_in(PG_FUNCTION_ARGS)
397 char *str = PG_GETARG_CSTRING(0);
400 snap = parse_snapshot(str);
402 PG_RETURN_POINTER(snap);
406 * txid_snapshot_out(txid_snapshot) returns cstring
408 * output function for type txid_snapshot
411 txid_snapshot_out(PG_FUNCTION_ARGS)
413 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
417 initStringInfo(&str);
419 appendStringInfo(&str, TXID_FMT ":", snap->xmin);
420 appendStringInfo(&str, TXID_FMT ":", snap->xmax);
422 for (i = 0; i < snap->nxip; i++)
425 appendStringInfoChar(&str, ',');
426 appendStringInfo(&str, TXID_FMT, snap->xip[i]);
429 PG_RETURN_CSTRING(str.data);
433 * txid_snapshot_recv(internal) returns txid_snapshot
435 * binary input function for type txid_snapshot
437 * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
440 txid_snapshot_recv(PG_FUNCTION_ARGS)
442 StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
453 * load nxip and check for nonsense.
455 * (nxip > avail) check is against int overflows in 'expect'.
457 nxip = pq_getmsgint(buf, 4);
458 avail = buf->len - buf->cursor;
459 expect = 8 + 8 + nxip * 8;
460 if (nxip < 0 || nxip > avail || expect > avail)
463 xmin = pq_getmsgint64(buf);
464 xmax = pq_getmsgint64(buf);
465 if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
468 snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
472 SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
474 for (i = 0; i < nxip; i++)
476 txid cur = pq_getmsgint64(buf);
478 if (cur <= last || cur < xmin || cur >= xmax)
483 PG_RETURN_POINTER(snap);
486 elog(ERROR, "invalid snapshot data");
491 * txid_snapshot_send(txid_snapshot) returns bytea
493 * binary output function for type txid_snapshot
495 * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
498 txid_snapshot_send(PG_FUNCTION_ARGS)
500 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
504 pq_begintypsend(&buf);
505 pq_sendint(&buf, snap->nxip, 4);
506 pq_sendint64(&buf, snap->xmin);
507 pq_sendint64(&buf, snap->xmax);
508 for (i = 0; i < snap->nxip; i++)
509 pq_sendint64(&buf, snap->xip[i]);
510 PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
514 * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
516 * is txid visible in snapshot ?
519 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
521 txid value = PG_GETARG_INT64(0);
522 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
524 PG_RETURN_BOOL(is_visible_txid(value, snap));
528 * txid_snapshot_xmin(txid_snapshot) returns int8
530 * return snapshot's xmin
533 txid_snapshot_xmin(PG_FUNCTION_ARGS)
535 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
537 PG_RETURN_INT64(snap->xmin);
541 * txid_snapshot_xmax(txid_snapshot) returns int8
543 * return snapshot's xmax
546 txid_snapshot_xmax(PG_FUNCTION_ARGS)
548 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
550 PG_RETURN_INT64(snap->xmax);
554 * txid_snapshot_xip(txid_snapshot) returns setof int8
556 * return in-progress TXIDs in snapshot.
559 txid_snapshot_xip(PG_FUNCTION_ARGS)
561 FuncCallContext *fctx;
565 /* on first call initialize snap_state and get copy of snapshot */
566 if (SRF_IS_FIRSTCALL())
568 TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
570 fctx = SRF_FIRSTCALL_INIT();
572 /* make a copy of user snapshot */
573 snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
574 memcpy(snap, arg, VARSIZE(arg));
576 fctx->user_fctx = snap;
579 /* return values one-by-one */
580 fctx = SRF_PERCALL_SETUP();
581 snap = fctx->user_fctx;
582 if (fctx->call_cntr < snap->nxip)
584 value = snap->xip[fctx->call_cntr];
585 SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
589 SRF_RETURN_DONE(fctx);