]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/txid.c
Add C comment that txid_current() assigns an XID if one is not already
[postgresql] / src / backend / utils / adt / txid.c
1 /*-------------------------------------------------------------------------
2  * txid.c
3  *
4  *      Export internal transaction IDs to user level.
5  *
6  * Note that only top-level transaction IDs are ever converted to TXID.
7  * This is important because TXIDs frequently persist beyond the global
8  * xmin horizon, or may even be shipped to other machines, so we cannot
9  * rely on being able to correlate subtransaction IDs with their parents
10  * via functions such as SubTransGetTopmostTransaction().
11  *
12  *
13  *      Copyright (c) 2003-2011, PostgreSQL Global Development Group
14  *      Author: Jan Wieck, Afilias USA INC.
15  *      64-bit txids: Marko Kreen, Skype Technologies
16  *
17  *      src/backend/utils/adt/txid.c
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include "access/transam.h"
25 #include "access/xact.h"
26 #include "funcapi.h"
27 #include "miscadmin.h"
28 #include "libpq/pqformat.h"
29 #include "utils/builtins.h"
30 #include "utils/snapmgr.h"
31
32
33 /* txid will be signed int8 in database, so must limit to 63 bits */
34 #define MAX_TXID   UINT64CONST(0x7FFFFFFFFFFFFFFF)
35
36 /* Use unsigned variant internally */
37 typedef uint64 txid;
38
39 /* sprintf format code for uint64 */
40 #define TXID_FMT UINT64_FORMAT
41
42 /*
43  * If defined, use bsearch() function for searching for txids in snapshots
44  * that have more than the specified number of values.
45  */
46 #define USE_BSEARCH_IF_NXIP_GREATER 30
47
48
49 /*
50  * Snapshot containing 8byte txids.
51  */
52 typedef struct
53 {
54         /*
55          * 4-byte length hdr, should not be touched directly.
56          *
57          * Explicit embedding is ok as we want always correct alignment anyway.
58          */
59         int32           __varsz;
60
61         uint32          nxip;                   /* number of txids in xip array */
62         txid            xmin;
63         txid            xmax;
64         txid            xip[1];                 /* in-progress txids, xmin <= xip[i] < xmax */
65 } TxidSnapshot;
66
67 #define TXID_SNAPSHOT_SIZE(nxip) \
68         (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
69
70 /*
71  * Epoch values from xact.c
72  */
73 typedef struct
74 {
75         TransactionId last_xid;
76         uint32          epoch;
77 } TxidEpoch;
78
79
80 /*
81  * Fetch epoch data from xact.c.
82  */
83 static void
84 load_xid_epoch(TxidEpoch *state)
85 {
86         GetNextXidAndEpoch(&state->last_xid, &state->epoch);
87 }
88
89 /*
90  * do a TransactionId -> txid conversion for an XID near the given epoch
91  */
92 static txid
93 convert_xid(TransactionId xid, const TxidEpoch *state)
94 {
95         uint64          epoch;
96
97         /* return special xid's as-is */
98         if (!TransactionIdIsNormal(xid))
99                 return (txid) xid;
100
101         /* xid can be on either side when near wrap-around */
102         epoch = (uint64) state->epoch;
103         if (xid > state->last_xid &&
104                 TransactionIdPrecedes(xid, state->last_xid))
105                 epoch--;
106         else if (xid < state->last_xid &&
107                          TransactionIdFollows(xid, state->last_xid))
108                 epoch++;
109
110         return (epoch << 32) | xid;
111 }
112
113 /*
114  * txid comparator for qsort/bsearch
115  */
116 static int
117 cmp_txid(const void *aa, const void *bb)
118 {
119         txid            a = *(const txid *) aa;
120         txid            b = *(const txid *) bb;
121
122         if (a < b)
123                 return -1;
124         if (a > b)
125                 return 1;
126         return 0;
127 }
128
129 /*
130  * sort a snapshot's txids, so we can use bsearch() later.
131  *
132  * For consistency of on-disk representation, we always sort even if bsearch
133  * will not be used.
134  */
135 static void
136 sort_snapshot(TxidSnapshot *snap)
137 {
138         if (snap->nxip > 1)
139                 qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
140 }
141
142 /*
143  * check txid visibility.
144  */
145 static bool
146 is_visible_txid(txid value, const TxidSnapshot *snap)
147 {
148         if (value < snap->xmin)
149                 return true;
150         else if (value >= snap->xmax)
151                 return false;
152 #ifdef USE_BSEARCH_IF_NXIP_GREATER
153         else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
154         {
155                 void       *res;
156
157                 res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
158                 /* if found, transaction is still in progress */
159                 return (res) ? false : true;
160         }
161 #endif
162         else
163         {
164                 uint32          i;
165
166                 for (i = 0; i < snap->nxip; i++)
167                 {
168                         if (value == snap->xip[i])
169                                 return false;
170                 }
171                 return true;
172         }
173 }
174
175 /*
176  * helper functions to use StringInfo for TxidSnapshot creation.
177  */
178
179 static StringInfo
180 buf_init(txid xmin, txid xmax)
181 {
182         TxidSnapshot snap;
183         StringInfo      buf;
184
185         snap.xmin = xmin;
186         snap.xmax = xmax;
187         snap.nxip = 0;
188
189         buf = makeStringInfo();
190         appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
191         return buf;
192 }
193
194 static void
195 buf_add_txid(StringInfo buf, txid xid)
196 {
197         TxidSnapshot *snap = (TxidSnapshot *) buf->data;
198
199         /* do this before possible realloc */
200         snap->nxip++;
201
202         appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
203 }
204
205 static TxidSnapshot *
206 buf_finalize(StringInfo buf)
207 {
208         TxidSnapshot *snap = (TxidSnapshot *) buf->data;
209
210         SET_VARSIZE(snap, buf->len);
211
212         /* buf is not needed anymore */
213         buf->data = NULL;
214         pfree(buf);
215
216         return snap;
217 }
218
219 /*
220  * simple number parser.
221  *
222  * We return 0 on error, which is invalid value for txid.
223  */
224 static txid
225 str2txid(const char *s, const char **endp)
226 {
227         txid            val = 0;
228         txid            cutoff = MAX_TXID / 10;
229         txid            cutlim = MAX_TXID % 10;
230
231         for (; *s; s++)
232         {
233                 unsigned        d;
234
235                 if (*s < '0' || *s > '9')
236                         break;
237                 d = *s - '0';
238
239                 /*
240                  * check for overflow
241                  */
242                 if (val > cutoff || (val == cutoff && d > cutlim))
243                 {
244                         val = 0;
245                         break;
246                 }
247
248                 val = val * 10 + d;
249         }
250         if (endp)
251                 *endp = s;
252         return val;
253 }
254
255 /*
256  * parse snapshot from cstring
257  */
258 static TxidSnapshot *
259 parse_snapshot(const char *str)
260 {
261         txid            xmin;
262         txid            xmax;
263         txid            last_val = 0,
264                                 val;
265         const char *str_start = str;
266         const char *endp;
267         StringInfo      buf;
268
269         xmin = str2txid(str, &endp);
270         if (*endp != ':')
271                 goto bad_format;
272         str = endp + 1;
273
274         xmax = str2txid(str, &endp);
275         if (*endp != ':')
276                 goto bad_format;
277         str = endp + 1;
278
279         /* it should look sane */
280         if (xmin == 0 || xmax == 0 || xmin > xmax)
281                 goto bad_format;
282
283         /* allocate buffer */
284         buf = buf_init(xmin, xmax);
285
286         /* loop over values */
287         while (*str != '\0')
288         {
289                 /* read next value */
290                 val = str2txid(str, &endp);
291                 str = endp;
292
293                 /* require the input to be in order */
294                 if (val < xmin || val >= xmax || val <= last_val)
295                         goto bad_format;
296
297                 buf_add_txid(buf, val);
298                 last_val = val;
299
300                 if (*str == ',')
301                         str++;
302                 else if (*str != '\0')
303                         goto bad_format;
304         }
305
306         return buf_finalize(buf);
307
308 bad_format:
309         elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
310         return NULL;
311 }
312
313 /*
314  * Public functions.
315  *
316  * txid_current() and txid_current_snapshot() are the only ones that
317  * communicate with core xid machinery.  All the others work on data
318  * returned by them.
319  */
320
321 /*
322  * txid_current() returns int8
323  *
324  *      Return the current toplevel transaction ID as TXID
325  *      If the current transaction does not have one, one is assigned.
326  */
327 Datum
328 txid_current(PG_FUNCTION_ARGS)
329 {
330         txid            val;
331         TxidEpoch       state;
332
333         /*
334          * Must prevent during recovery because if an xid is not assigned we try
335          * to assign one, which would fail. Programs already rely on this function
336          * to always return a valid current xid, so we should not change this to
337          * return NULL or similar invalid xid.
338          */
339         PreventCommandDuringRecovery("txid_current()");
340
341         load_xid_epoch(&state);
342
343         val = convert_xid(GetTopTransactionId(), &state);
344
345         PG_RETURN_INT64(val);
346 }
347
348 /*
349  * txid_current_snapshot() returns txid_snapshot
350  *
351  *              Return current snapshot in TXID format
352  *
353  * Note that only top-transaction XIDs are included in the snapshot.
354  */
355 Datum
356 txid_current_snapshot(PG_FUNCTION_ARGS)
357 {
358         TxidSnapshot *snap;
359         uint32          nxip,
360                                 i,
361                                 size;
362         TxidEpoch       state;
363         Snapshot        cur;
364
365         cur = GetActiveSnapshot();
366         if (cur == NULL)
367                 elog(ERROR, "no active snapshot set");
368
369         load_xid_epoch(&state);
370
371         /* allocate */
372         nxip = cur->xcnt;
373         size = TXID_SNAPSHOT_SIZE(nxip);
374         snap = palloc(size);
375         SET_VARSIZE(snap, size);
376
377         /* fill */
378         snap->xmin = convert_xid(cur->xmin, &state);
379         snap->xmax = convert_xid(cur->xmax, &state);
380         snap->nxip = nxip;
381         for (i = 0; i < nxip; i++)
382                 snap->xip[i] = convert_xid(cur->xip[i], &state);
383
384         /* we want them guaranteed to be in ascending order */
385         sort_snapshot(snap);
386
387         PG_RETURN_POINTER(snap);
388 }
389
390 /*
391  * txid_snapshot_in(cstring) returns txid_snapshot
392  *
393  *              input function for type txid_snapshot
394  */
395 Datum
396 txid_snapshot_in(PG_FUNCTION_ARGS)
397 {
398         char       *str = PG_GETARG_CSTRING(0);
399         TxidSnapshot *snap;
400
401         snap = parse_snapshot(str);
402
403         PG_RETURN_POINTER(snap);
404 }
405
406 /*
407  * txid_snapshot_out(txid_snapshot) returns cstring
408  *
409  *              output function for type txid_snapshot
410  */
411 Datum
412 txid_snapshot_out(PG_FUNCTION_ARGS)
413 {
414         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
415         StringInfoData str;
416         uint32          i;
417
418         initStringInfo(&str);
419
420         appendStringInfo(&str, TXID_FMT ":", snap->xmin);
421         appendStringInfo(&str, TXID_FMT ":", snap->xmax);
422
423         for (i = 0; i < snap->nxip; i++)
424         {
425                 if (i > 0)
426                         appendStringInfoChar(&str, ',');
427                 appendStringInfo(&str, TXID_FMT, snap->xip[i]);
428         }
429
430         PG_RETURN_CSTRING(str.data);
431 }
432
433 /*
434  * txid_snapshot_recv(internal) returns txid_snapshot
435  *
436  *              binary input function for type txid_snapshot
437  *
438  *              format: int4 nxip, int8 xmin, int8 xmax, int8 xip
439  */
440 Datum
441 txid_snapshot_recv(PG_FUNCTION_ARGS)
442 {
443         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
444         TxidSnapshot *snap;
445         txid            last = 0;
446         int                     nxip;
447         int                     i;
448         int                     avail;
449         int                     expect;
450         txid            xmin,
451                                 xmax;
452
453         /*
454          * load nxip and check for nonsense.
455          *
456          * (nxip > avail) check is against int overflows in 'expect'.
457          */
458         nxip = pq_getmsgint(buf, 4);
459         avail = buf->len - buf->cursor;
460         expect = 8 + 8 + nxip * 8;
461         if (nxip < 0 || nxip > avail || expect > avail)
462                 goto bad_format;
463
464         xmin = pq_getmsgint64(buf);
465         xmax = pq_getmsgint64(buf);
466         if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
467                 goto bad_format;
468
469         snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
470         snap->xmin = xmin;
471         snap->xmax = xmax;
472         snap->nxip = nxip;
473         SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
474
475         for (i = 0; i < nxip; i++)
476         {
477                 txid            cur = pq_getmsgint64(buf);
478
479                 if (cur <= last || cur < xmin || cur >= xmax)
480                         goto bad_format;
481                 snap->xip[i] = cur;
482                 last = cur;
483         }
484         PG_RETURN_POINTER(snap);
485
486 bad_format:
487         elog(ERROR, "invalid snapshot data");
488         return (Datum) NULL;
489 }
490
491 /*
492  * txid_snapshot_send(txid_snapshot) returns bytea
493  *
494  *              binary output function for type txid_snapshot
495  *
496  *              format: int4 nxip, int8 xmin, int8 xmax, int8 xip
497  */
498 Datum
499 txid_snapshot_send(PG_FUNCTION_ARGS)
500 {
501         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
502         StringInfoData buf;
503         uint32          i;
504
505         pq_begintypsend(&buf);
506         pq_sendint(&buf, snap->nxip, 4);
507         pq_sendint64(&buf, snap->xmin);
508         pq_sendint64(&buf, snap->xmax);
509         for (i = 0; i < snap->nxip; i++)
510                 pq_sendint64(&buf, snap->xip[i]);
511         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
512 }
513
514 /*
515  * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
516  *
517  *              is txid visible in snapshot ?
518  */
519 Datum
520 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
521 {
522         txid            value = PG_GETARG_INT64(0);
523         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
524
525         PG_RETURN_BOOL(is_visible_txid(value, snap));
526 }
527
528 /*
529  * txid_snapshot_xmin(txid_snapshot) returns int8
530  *
531  *              return snapshot's xmin
532  */
533 Datum
534 txid_snapshot_xmin(PG_FUNCTION_ARGS)
535 {
536         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
537
538         PG_RETURN_INT64(snap->xmin);
539 }
540
541 /*
542  * txid_snapshot_xmax(txid_snapshot) returns int8
543  *
544  *              return snapshot's xmax
545  */
546 Datum
547 txid_snapshot_xmax(PG_FUNCTION_ARGS)
548 {
549         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
550
551         PG_RETURN_INT64(snap->xmax);
552 }
553
554 /*
555  * txid_snapshot_xip(txid_snapshot) returns setof int8
556  *
557  *              return in-progress TXIDs in snapshot.
558  */
559 Datum
560 txid_snapshot_xip(PG_FUNCTION_ARGS)
561 {
562         FuncCallContext *fctx;
563         TxidSnapshot *snap;
564         txid            value;
565
566         /* on first call initialize snap_state and get copy of snapshot */
567         if (SRF_IS_FIRSTCALL())
568         {
569                 TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
570
571                 fctx = SRF_FIRSTCALL_INIT();
572
573                 /* make a copy of user snapshot */
574                 snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
575                 memcpy(snap, arg, VARSIZE(arg));
576
577                 fctx->user_fctx = snap;
578         }
579
580         /* return values one-by-one */
581         fctx = SRF_PERCALL_SETUP();
582         snap = fctx->user_fctx;
583         if (fctx->call_cntr < snap->nxip)
584         {
585                 value = snap->xip[fctx->call_cntr];
586                 SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
587         }
588         else
589         {
590                 SRF_RETURN_DONE(fctx);
591         }
592 }