]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/txid.c
Re-run pgindent with updated list of typedefs. (Updated README should
[postgresql] / src / backend / utils / adt / txid.c
1 /*-------------------------------------------------------------------------
2  * txid.c
3  *
4  *      Export internal transaction IDs to user level.
5  *
6  * Note that only top-level transaction IDs are ever converted to TXID.
7  * This is important because TXIDs frequently persist beyond the global
8  * xmin horizon, or may even be shipped to other machines, so we cannot
9  * rely on being able to correlate subtransaction IDs with their parents
10  * via functions such as SubTransGetTopmostTransaction().
11  *
12  *
13  *      Copyright (c) 2003-2007, PostgreSQL Global Development Group
14  *      Author: Jan Wieck, Afilias USA INC.
15  *      64-bit txids: Marko Kreen, Skype Technologies
16  *
17  *      $PostgreSQL: pgsql/src/backend/utils/adt/txid.c,v 1.3 2007/11/15 22:25:16 momjian Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include "access/transam.h"
25 #include "access/xact.h"
26 #include "funcapi.h"
27 #include "libpq/pqformat.h"
28 #include "utils/builtins.h"
29
30
31 #ifndef INT64_IS_BUSTED
32 /* txid will be signed int8 in database, so must limit to 63 bits */
33 #define MAX_TXID   UINT64CONST(0x7FFFFFFFFFFFFFFF)
34 #else
35 /* we only really have 32 bits to work with :-( */
36 #define MAX_TXID   UINT64CONST(0x7FFFFFFF)
37 #endif
38
39 /* Use unsigned variant internally */
40 typedef uint64 txid;
41
42 /* sprintf format code for uint64 */
43 #define TXID_FMT UINT64_FORMAT
44
45 /*
46  * If defined, use bsearch() function for searching for txids in snapshots
47  * that have more than the specified number of values.
48  */
49 #define USE_BSEARCH_IF_NXIP_GREATER 30
50
51
52 /*
53  * Snapshot containing 8byte txids.
54  */
55 typedef struct
56 {
57         /*
58          * 4-byte length hdr, should not be touched directly.
59          *
60          * Explicit embedding is ok as we want always correct alignment anyway.
61          */
62         int32           __varsz;
63
64         uint32          nxip;                   /* number of txids in xip array */
65         txid            xmin;
66         txid            xmax;
67         txid            xip[1];                 /* in-progress txids, xmin <= xip[i] < xmax */
68 } TxidSnapshot;
69
70 #define TXID_SNAPSHOT_SIZE(nxip) \
71         (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
72
73 /*
74  * Epoch values from xact.c
75  */
76 typedef struct
77 {
78         TransactionId last_xid;
79         uint32          epoch;
80 } TxidEpoch;
81
82
83 /*
84  * Fetch epoch data from xact.c.
85  */
86 static void
87 load_xid_epoch(TxidEpoch *state)
88 {
89         GetNextXidAndEpoch(&state->last_xid, &state->epoch);
90 }
91
92 /*
93  * do a TransactionId -> txid conversion for an XID near the given epoch
94  */
95 static txid
96 convert_xid(TransactionId xid, const TxidEpoch *state)
97 {
98 #ifndef INT64_IS_BUSTED
99         uint64          epoch;
100
101         /* return special xid's as-is */
102         if (!TransactionIdIsNormal(xid))
103                 return (txid) xid;
104
105         /* xid can be on either side when near wrap-around */
106         epoch = (uint64) state->epoch;
107         if (xid > state->last_xid &&
108                 TransactionIdPrecedes(xid, state->last_xid))
109                 epoch--;
110         else if (xid < state->last_xid &&
111                          TransactionIdFollows(xid, state->last_xid))
112                 epoch++;
113
114         return (epoch << 32) | xid;
115 #else                                                   /* INT64_IS_BUSTED */
116         /* we can't do anything with the epoch, so ignore it */
117         return (txid) xid & MAX_TXID;
118 #endif   /* INT64_IS_BUSTED */
119 }
120
121 /*
122  * txid comparator for qsort/bsearch
123  */
124 static int
125 cmp_txid(const void *aa, const void *bb)
126 {
127         txid            a = *(const txid *) aa;
128         txid            b = *(const txid *) bb;
129
130         if (a < b)
131                 return -1;
132         if (a > b)
133                 return 1;
134         return 0;
135 }
136
137 /*
138  * sort a snapshot's txids, so we can use bsearch() later.
139  *
140  * For consistency of on-disk representation, we always sort even if bsearch
141  * will not be used.
142  */
143 static void
144 sort_snapshot(TxidSnapshot *snap)
145 {
146         if (snap->nxip > 1)
147                 qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
148 }
149
150 /*
151  * check txid visibility.
152  */
153 static bool
154 is_visible_txid(txid value, const TxidSnapshot *snap)
155 {
156         if (value < snap->xmin)
157                 return true;
158         else if (value >= snap->xmax)
159                 return false;
160 #ifdef USE_BSEARCH_IF_NXIP_GREATER
161         else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
162         {
163                 void       *res;
164
165                 res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
166                 /* if found, transaction is still in progress */
167                 return (res) ? false : true;
168         }
169 #endif
170         else
171         {
172                 uint32          i;
173
174                 for (i = 0; i < snap->nxip; i++)
175                 {
176                         if (value == snap->xip[i])
177                                 return false;
178                 }
179                 return true;
180         }
181 }
182
183 /*
184  * helper functions to use StringInfo for TxidSnapshot creation.
185  */
186
187 static StringInfo
188 buf_init(txid xmin, txid xmax)
189 {
190         TxidSnapshot snap;
191         StringInfo      buf;
192
193         snap.xmin = xmin;
194         snap.xmax = xmax;
195         snap.nxip = 0;
196
197         buf = makeStringInfo();
198         appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
199         return buf;
200 }
201
202 static void
203 buf_add_txid(StringInfo buf, txid xid)
204 {
205         TxidSnapshot *snap = (TxidSnapshot *) buf->data;
206
207         /* do this before possible realloc */
208         snap->nxip++;
209
210         appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
211 }
212
213 static TxidSnapshot *
214 buf_finalize(StringInfo buf)
215 {
216         TxidSnapshot *snap = (TxidSnapshot *) buf->data;
217
218         SET_VARSIZE(snap, buf->len);
219
220         /* buf is not needed anymore */
221         buf->data = NULL;
222         pfree(buf);
223
224         return snap;
225 }
226
227 /*
228  * simple number parser.
229  *
230  * We return 0 on error, which is invalid value for txid.
231  */
232 static txid
233 str2txid(const char *s, const char **endp)
234 {
235         txid            val = 0;
236         txid            cutoff = MAX_TXID / 10;
237         txid            cutlim = MAX_TXID % 10;
238
239         for (; *s; s++)
240         {
241                 unsigned        d;
242
243                 if (*s < '0' || *s > '9')
244                         break;
245                 d = *s - '0';
246
247                 /*
248                  * check for overflow
249                  */
250                 if (val > cutoff || (val == cutoff && d > cutlim))
251                 {
252                         val = 0;
253                         break;
254                 }
255
256                 val = val * 10 + d;
257         }
258         if (endp)
259                 *endp = s;
260         return val;
261 }
262
263 /*
264  * parse snapshot from cstring
265  */
266 static TxidSnapshot *
267 parse_snapshot(const char *str)
268 {
269         txid            xmin;
270         txid            xmax;
271         txid            last_val = 0,
272                                 val;
273         const char *str_start = str;
274         const char *endp;
275         StringInfo      buf;
276
277         xmin = str2txid(str, &endp);
278         if (*endp != ':')
279                 goto bad_format;
280         str = endp + 1;
281
282         xmax = str2txid(str, &endp);
283         if (*endp != ':')
284                 goto bad_format;
285         str = endp + 1;
286
287         /* it should look sane */
288         if (xmin == 0 || xmax == 0 || xmin > xmax)
289                 goto bad_format;
290
291         /* allocate buffer */
292         buf = buf_init(xmin, xmax);
293
294         /* loop over values */
295         while (*str != '\0')
296         {
297                 /* read next value */
298                 val = str2txid(str, &endp);
299                 str = endp;
300
301                 /* require the input to be in order */
302                 if (val < xmin || val >= xmax || val <= last_val)
303                         goto bad_format;
304
305                 buf_add_txid(buf, val);
306                 last_val = val;
307
308                 if (*str == ',')
309                         str++;
310                 else if (*str != '\0')
311                         goto bad_format;
312         }
313
314         return buf_finalize(buf);
315
316 bad_format:
317         elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
318         return NULL;
319 }
320
321 /*
322  * Public functions.
323  *
324  * txid_current() and txid_current_snapshot() are the only ones that
325  * communicate with core xid machinery.  All the others work on data
326  * returned by them.
327  */
328
329 /*
330  * txid_current() returns int8
331  *
332  *              Return the current toplevel transaction ID as TXID
333  */
334 Datum
335 txid_current(PG_FUNCTION_ARGS)
336 {
337         txid            val;
338         TxidEpoch       state;
339
340         load_xid_epoch(&state);
341
342         val = convert_xid(GetTopTransactionId(), &state);
343
344         PG_RETURN_INT64(val);
345 }
346
347 /*
348  * txid_current_snapshot() returns txid_snapshot
349  *
350  *              Return current snapshot in TXID format
351  *
352  * Note that only top-transaction XIDs are included in the snapshot.
353  */
354 Datum
355 txid_current_snapshot(PG_FUNCTION_ARGS)
356 {
357         TxidSnapshot *snap;
358         uint32          nxip,
359                                 i,
360                                 size;
361         TxidEpoch       state;
362         Snapshot        cur;
363
364         cur = ActiveSnapshot;
365         if (cur == NULL)
366                 elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL");
367
368         load_xid_epoch(&state);
369
370         /* allocate */
371         nxip = cur->xcnt;
372         size = TXID_SNAPSHOT_SIZE(nxip);
373         snap = palloc(size);
374         SET_VARSIZE(snap, size);
375
376         /* fill */
377         snap->xmin = convert_xid(cur->xmin, &state);
378         snap->xmax = convert_xid(cur->xmax, &state);
379         snap->nxip = nxip;
380         for (i = 0; i < nxip; i++)
381                 snap->xip[i] = convert_xid(cur->xip[i], &state);
382
383         /* we want them guaranteed to be in ascending order */
384         sort_snapshot(snap);
385
386         PG_RETURN_POINTER(snap);
387 }
388
389 /*
390  * txid_snapshot_in(cstring) returns txid_snapshot
391  *
392  *              input function for type txid_snapshot
393  */
394 Datum
395 txid_snapshot_in(PG_FUNCTION_ARGS)
396 {
397         char       *str = PG_GETARG_CSTRING(0);
398         TxidSnapshot *snap;
399
400         snap = parse_snapshot(str);
401
402         PG_RETURN_POINTER(snap);
403 }
404
405 /*
406  * txid_snapshot_out(txid_snapshot) returns cstring
407  *
408  *              output function for type txid_snapshot
409  */
410 Datum
411 txid_snapshot_out(PG_FUNCTION_ARGS)
412 {
413         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
414         StringInfoData str;
415         uint32          i;
416
417         initStringInfo(&str);
418
419         appendStringInfo(&str, TXID_FMT ":", snap->xmin);
420         appendStringInfo(&str, TXID_FMT ":", snap->xmax);
421
422         for (i = 0; i < snap->nxip; i++)
423         {
424                 if (i > 0)
425                         appendStringInfoChar(&str, ',');
426                 appendStringInfo(&str, TXID_FMT, snap->xip[i]);
427         }
428
429         PG_RETURN_CSTRING(str.data);
430 }
431
432 /*
433  * txid_snapshot_recv(internal) returns txid_snapshot
434  *
435  *              binary input function for type txid_snapshot
436  *
437  *              format: int4 nxip, int8 xmin, int8 xmax, int8 xip
438  */
439 Datum
440 txid_snapshot_recv(PG_FUNCTION_ARGS)
441 {
442         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
443         TxidSnapshot *snap;
444         txid            last = 0;
445         int                     nxip;
446         int                     i;
447         int                     avail;
448         int                     expect;
449         txid            xmin,
450                                 xmax;
451
452         /*
453          * load nxip and check for nonsense.
454          *
455          * (nxip > avail) check is against int overflows in 'expect'.
456          */
457         nxip = pq_getmsgint(buf, 4);
458         avail = buf->len - buf->cursor;
459         expect = 8 + 8 + nxip * 8;
460         if (nxip < 0 || nxip > avail || expect > avail)
461                 goto bad_format;
462
463         xmin = pq_getmsgint64(buf);
464         xmax = pq_getmsgint64(buf);
465         if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
466                 goto bad_format;
467
468         snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
469         snap->xmin = xmin;
470         snap->xmax = xmax;
471         snap->nxip = nxip;
472         SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
473
474         for (i = 0; i < nxip; i++)
475         {
476                 txid            cur = pq_getmsgint64(buf);
477
478                 if (cur <= last || cur < xmin || cur >= xmax)
479                         goto bad_format;
480                 snap->xip[i] = cur;
481                 last = cur;
482         }
483         PG_RETURN_POINTER(snap);
484
485 bad_format:
486         elog(ERROR, "invalid snapshot data");
487         return (Datum) NULL;
488 }
489
490 /*
491  * txid_snapshot_send(txid_snapshot) returns bytea
492  *
493  *              binary output function for type txid_snapshot
494  *
495  *              format: int4 nxip, int8 xmin, int8 xmax, int8 xip
496  */
497 Datum
498 txid_snapshot_send(PG_FUNCTION_ARGS)
499 {
500         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
501         StringInfoData buf;
502         uint32          i;
503
504         pq_begintypsend(&buf);
505         pq_sendint(&buf, snap->nxip, 4);
506         pq_sendint64(&buf, snap->xmin);
507         pq_sendint64(&buf, snap->xmax);
508         for (i = 0; i < snap->nxip; i++)
509                 pq_sendint64(&buf, snap->xip[i]);
510         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
511 }
512
513 /*
514  * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
515  *
516  *              is txid visible in snapshot ?
517  */
518 Datum
519 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
520 {
521         txid            value = PG_GETARG_INT64(0);
522         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
523
524         PG_RETURN_BOOL(is_visible_txid(value, snap));
525 }
526
527 /*
528  * txid_snapshot_xmin(txid_snapshot) returns int8
529  *
530  *              return snapshot's xmin
531  */
532 Datum
533 txid_snapshot_xmin(PG_FUNCTION_ARGS)
534 {
535         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
536
537         PG_RETURN_INT64(snap->xmin);
538 }
539
540 /*
541  * txid_snapshot_xmax(txid_snapshot) returns int8
542  *
543  *              return snapshot's xmax
544  */
545 Datum
546 txid_snapshot_xmax(PG_FUNCTION_ARGS)
547 {
548         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
549
550         PG_RETURN_INT64(snap->xmax);
551 }
552
553 /*
554  * txid_snapshot_xip(txid_snapshot) returns setof int8
555  *
556  *              return in-progress TXIDs in snapshot.
557  */
558 Datum
559 txid_snapshot_xip(PG_FUNCTION_ARGS)
560 {
561         FuncCallContext *fctx;
562         TxidSnapshot *snap;
563         txid            value;
564
565         /* on first call initialize snap_state and get copy of snapshot */
566         if (SRF_IS_FIRSTCALL())
567         {
568                 TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
569
570                 fctx = SRF_FIRSTCALL_INIT();
571
572                 /* make a copy of user snapshot */
573                 snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
574                 memcpy(snap, arg, VARSIZE(arg));
575
576                 fctx->user_fctx = snap;
577         }
578
579         /* return values one-by-one */
580         fctx = SRF_PERCALL_SETUP();
581         snap = fctx->user_fctx;
582         if (fctx->call_cntr < snap->nxip)
583         {
584                 value = snap->xip[fctx->call_cntr];
585                 SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
586         }
587         else
588         {
589                 SRF_RETURN_DONE(fctx);
590         }
591 }