1 /*-------------------------------------------------------------------------
4 * Export internal transaction IDs to user level.
6 * Note that only top-level transaction IDs are ever converted to TXID.
7 * This is important because TXIDs frequently persist beyond the global
8 * xmin horizon, or may even be shipped to other machines, so we cannot
9 * rely on being able to correlate subtransaction IDs with their parents
10 * via functions such as SubTransGetTopmostTransaction().
13 * Copyright (c) 2003-2015, PostgreSQL Global Development Group
14 * Author: Jan Wieck, Afilias USA INC.
15 * 64-bit txids: Marko Kreen, Skype Technologies
17 * src/backend/utils/adt/txid.c
19 *-------------------------------------------------------------------------
24 #include "access/transam.h"
25 #include "access/xact.h"
26 #include "access/xlog.h"
28 #include "miscadmin.h"
29 #include "libpq/pqformat.h"
30 #include "postmaster/postmaster.h"
31 #include "utils/builtins.h"
32 #include "utils/memutils.h"
33 #include "utils/snapmgr.h"
36 /* txid will be signed int8 in database, so must limit to 63 bits */
37 #define MAX_TXID ((uint64) PG_INT64_MAX)
39 /* Use unsigned variant internally */
42 /* sprintf format code for uint64 */
43 #define TXID_FMT UINT64_FORMAT
46 * If defined, use bsearch() function for searching for txids in snapshots
47 * that have more than the specified number of values.
49 #define USE_BSEARCH_IF_NXIP_GREATER 30
53 * Snapshot containing 8byte txids.
58 * 4-byte length hdr, should not be touched directly.
60 * Explicit embedding is ok as we want always correct alignment anyway.
64 uint32 nxip; /* number of txids in xip array */
67 /* in-progress txids, xmin <= xip[i] < xmax: */
68 txid xip[FLEXIBLE_ARRAY_MEMBER];
71 #define TXID_SNAPSHOT_SIZE(nxip) \
72 (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
73 #define TXID_SNAPSHOT_MAX_NXIP \
74 ((MaxAllocSize - offsetof(TxidSnapshot, xip)) / sizeof(txid))
77 * Epoch values from xact.c
81 TransactionId last_xid;
87 * Fetch epoch data from xact.c.
90 load_xid_epoch(TxidEpoch *state)
92 GetNextXidAndEpoch(&state->last_xid, &state->epoch);
96 * do a TransactionId -> txid conversion for an XID near the given epoch
99 convert_xid(TransactionId xid, const TxidEpoch *state)
103 /* return special xid's as-is */
104 if (!TransactionIdIsNormal(xid))
107 /* xid can be on either side when near wrap-around */
108 epoch = (uint64) state->epoch;
109 if (xid > state->last_xid &&
110 TransactionIdPrecedes(xid, state->last_xid))
112 else if (xid < state->last_xid &&
113 TransactionIdFollows(xid, state->last_xid))
116 return (epoch << 32) | xid;
120 * txid comparator for qsort/bsearch
123 cmp_txid(const void *aa, const void *bb)
125 txid a = *(const txid *) aa;
126 txid b = *(const txid *) bb;
136 * Sort a snapshot's txids, so we can use bsearch() later. Also remove
139 * For consistency of on-disk representation, we always sort even if bsearch
143 sort_snapshot(TxidSnapshot *snap)
146 int nxip, idx1, idx2;
150 qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
152 /* remove duplicates */
157 if (snap->xip[idx1] != last)
158 last = snap->xip[idx2++] = snap->xip[idx1];
167 * check txid visibility.
170 is_visible_txid(txid value, const TxidSnapshot *snap)
172 if (value < snap->xmin)
174 else if (value >= snap->xmax)
176 #ifdef USE_BSEARCH_IF_NXIP_GREATER
177 else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
181 res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
182 /* if found, transaction is still in progress */
183 return (res) ? false : true;
190 for (i = 0; i < snap->nxip; i++)
192 if (value == snap->xip[i])
200 * helper functions to use StringInfo for TxidSnapshot creation.
204 buf_init(txid xmin, txid xmax)
213 buf = makeStringInfo();
214 appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
219 buf_add_txid(StringInfo buf, txid xid)
221 TxidSnapshot *snap = (TxidSnapshot *) buf->data;
223 /* do this before possible realloc */
226 appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
229 static TxidSnapshot *
230 buf_finalize(StringInfo buf)
232 TxidSnapshot *snap = (TxidSnapshot *) buf->data;
234 SET_VARSIZE(snap, buf->len);
236 /* buf is not needed anymore */
244 * simple number parser.
246 * We return 0 on error, which is invalid value for txid.
249 str2txid(const char *s, const char **endp)
252 txid cutoff = MAX_TXID / 10;
253 txid cutlim = MAX_TXID % 10;
259 if (*s < '0' || *s > '9')
266 if (val > cutoff || (val == cutoff && d > cutlim))
280 * parse snapshot from cstring
282 static TxidSnapshot *
283 parse_snapshot(const char *str)
289 const char *str_start = str;
293 xmin = str2txid(str, &endp);
298 xmax = str2txid(str, &endp);
303 /* it should look sane */
304 if (xmin == 0 || xmax == 0 || xmin > xmax)
307 /* allocate buffer */
308 buf = buf_init(xmin, xmax);
310 /* loop over values */
313 /* read next value */
314 val = str2txid(str, &endp);
317 /* require the input to be in order */
318 if (val < xmin || val >= xmax || val < last_val)
321 /* skip duplicates */
323 buf_add_txid(buf, val);
328 else if (*str != '\0')
332 return buf_finalize(buf);
335 elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
342 * txid_current() and txid_current_snapshot() are the only ones that
343 * communicate with core xid machinery. All the others work on data
348 * txid_current() returns int8
350 * Return the current toplevel transaction ID as TXID
351 * If the current transaction does not have one, one is assigned.
354 txid_current(PG_FUNCTION_ARGS)
360 * Must prevent during recovery because if an xid is not assigned we try
361 * to assign one, which would fail. Programs already rely on this function
362 * to always return a valid current xid, so we should not change this to
363 * return NULL or similar invalid xid.
365 PreventCommandDuringRecovery("txid_current()");
367 load_xid_epoch(&state);
369 val = convert_xid(GetTopTransactionId(), &state);
371 PG_RETURN_INT64(val);
375 * txid_current_snapshot() returns txid_snapshot
377 * Return current snapshot in TXID format
379 * Note that only top-transaction XIDs are included in the snapshot.
382 txid_current_snapshot(PG_FUNCTION_ARGS)
390 cur = GetActiveSnapshot();
392 elog(ERROR, "no active snapshot set");
394 load_xid_epoch(&state);
397 * Compile-time limits on the procarray (MAX_BACKENDS processes plus
398 * MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
400 StaticAssertStmt(MAX_BACKENDS * 2 <= TXID_SNAPSHOT_MAX_NXIP,
401 "possible overflow in txid_current_snapshot()");
405 snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
408 snap->xmin = convert_xid(cur->xmin, &state);
409 snap->xmax = convert_xid(cur->xmax, &state);
411 for (i = 0; i < nxip; i++)
412 snap->xip[i] = convert_xid(cur->xip[i], &state);
415 * We want them guaranteed to be in ascending order. This also removes
416 * any duplicate xids. Normally, an XID can only be assigned to one
417 * backend, but when preparing a transaction for two-phase commit, there
418 * is a transient state when both the original backend and the dummy
419 * PGPROC entry reserved for the prepared transaction hold the same XID.
423 /* set size after sorting, because it may have removed duplicate xips */
424 SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(snap->nxip));
426 PG_RETURN_POINTER(snap);
430 * txid_snapshot_in(cstring) returns txid_snapshot
432 * input function for type txid_snapshot
435 txid_snapshot_in(PG_FUNCTION_ARGS)
437 char *str = PG_GETARG_CSTRING(0);
440 snap = parse_snapshot(str);
442 PG_RETURN_POINTER(snap);
446 * txid_snapshot_out(txid_snapshot) returns cstring
448 * output function for type txid_snapshot
451 txid_snapshot_out(PG_FUNCTION_ARGS)
453 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
457 initStringInfo(&str);
459 appendStringInfo(&str, TXID_FMT ":", snap->xmin);
460 appendStringInfo(&str, TXID_FMT ":", snap->xmax);
462 for (i = 0; i < snap->nxip; i++)
465 appendStringInfoChar(&str, ',');
466 appendStringInfo(&str, TXID_FMT, snap->xip[i]);
469 PG_RETURN_CSTRING(str.data);
473 * txid_snapshot_recv(internal) returns txid_snapshot
475 * binary input function for type txid_snapshot
477 * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
480 txid_snapshot_recv(PG_FUNCTION_ARGS)
482 StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
490 /* load and validate nxip */
491 nxip = pq_getmsgint(buf, 4);
492 if (nxip < 0 || nxip > TXID_SNAPSHOT_MAX_NXIP)
495 xmin = pq_getmsgint64(buf);
496 xmax = pq_getmsgint64(buf);
497 if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
500 snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
504 for (i = 0; i < nxip; i++)
506 txid cur = pq_getmsgint64(buf);
508 if (cur < last || cur < xmin || cur >= xmax)
511 /* skip duplicate xips */
523 SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
524 PG_RETURN_POINTER(snap);
527 elog(ERROR, "invalid snapshot data");
532 * txid_snapshot_send(txid_snapshot) returns bytea
534 * binary output function for type txid_snapshot
536 * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
539 txid_snapshot_send(PG_FUNCTION_ARGS)
541 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
545 pq_begintypsend(&buf);
546 pq_sendint(&buf, snap->nxip, 4);
547 pq_sendint64(&buf, snap->xmin);
548 pq_sendint64(&buf, snap->xmax);
549 for (i = 0; i < snap->nxip; i++)
550 pq_sendint64(&buf, snap->xip[i]);
551 PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
555 * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
557 * is txid visible in snapshot ?
560 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
562 txid value = PG_GETARG_INT64(0);
563 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
565 PG_RETURN_BOOL(is_visible_txid(value, snap));
569 * txid_snapshot_xmin(txid_snapshot) returns int8
571 * return snapshot's xmin
574 txid_snapshot_xmin(PG_FUNCTION_ARGS)
576 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
578 PG_RETURN_INT64(snap->xmin);
582 * txid_snapshot_xmax(txid_snapshot) returns int8
584 * return snapshot's xmax
587 txid_snapshot_xmax(PG_FUNCTION_ARGS)
589 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
591 PG_RETURN_INT64(snap->xmax);
595 * txid_snapshot_xip(txid_snapshot) returns setof int8
597 * return in-progress TXIDs in snapshot.
600 txid_snapshot_xip(PG_FUNCTION_ARGS)
602 FuncCallContext *fctx;
606 /* on first call initialize snap_state and get copy of snapshot */
607 if (SRF_IS_FIRSTCALL())
609 TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
611 fctx = SRF_FIRSTCALL_INIT();
613 /* make a copy of user snapshot */
614 snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
615 memcpy(snap, arg, VARSIZE(arg));
617 fctx->user_fctx = snap;
620 /* return values one-by-one */
621 fctx = SRF_PERCALL_SETUP();
622 snap = fctx->user_fctx;
623 if (fctx->call_cntr < snap->nxip)
625 value = snap->xip[fctx->call_cntr];
626 SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
630 SRF_RETURN_DONE(fctx);