]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/txid.c
Update copyright for 2009.
[postgresql] / src / backend / utils / adt / txid.c
1 /*-------------------------------------------------------------------------
2  * txid.c
3  *
4  *      Export internal transaction IDs to user level.
5  *
6  * Note that only top-level transaction IDs are ever converted to TXID.
7  * This is important because TXIDs frequently persist beyond the global
8  * xmin horizon, or may even be shipped to other machines, so we cannot
9  * rely on being able to correlate subtransaction IDs with their parents
10  * via functions such as SubTransGetTopmostTransaction().
11  *
12  *
13  *      Copyright (c) 2003-2009, PostgreSQL Global Development Group
14  *      Author: Jan Wieck, Afilias USA INC.
15  *      64-bit txids: Marko Kreen, Skype Technologies
16  *
17  *      $PostgreSQL: pgsql/src/backend/utils/adt/txid.c,v 1.8 2009/01/01 17:23:50 momjian Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include "access/transam.h"
25 #include "access/xact.h"
26 #include "funcapi.h"
27 #include "libpq/pqformat.h"
28 #include "utils/builtins.h"
29 #include "utils/snapmgr.h"
30
31
32 #ifndef INT64_IS_BUSTED
33 /* txid will be signed int8 in database, so must limit to 63 bits */
34 #define MAX_TXID   UINT64CONST(0x7FFFFFFFFFFFFFFF)
35 #else
36 /* we only really have 32 bits to work with :-( */
37 #define MAX_TXID   UINT64CONST(0x7FFFFFFF)
38 #endif
39
40 /* Use unsigned variant internally */
41 typedef uint64 txid;
42
43 /* sprintf format code for uint64 */
44 #define TXID_FMT UINT64_FORMAT
45
46 /*
47  * If defined, use bsearch() function for searching for txids in snapshots
48  * that have more than the specified number of values.
49  */
50 #define USE_BSEARCH_IF_NXIP_GREATER 30
51
52
53 /*
54  * Snapshot containing 8byte txids.
55  */
56 typedef struct
57 {
58         /*
59          * 4-byte length hdr, should not be touched directly.
60          *
61          * Explicit embedding is ok as we want always correct alignment anyway.
62          */
63         int32           __varsz;
64
65         uint32          nxip;                   /* number of txids in xip array */
66         txid            xmin;
67         txid            xmax;
68         txid            xip[1];                 /* in-progress txids, xmin <= xip[i] < xmax */
69 } TxidSnapshot;
70
71 #define TXID_SNAPSHOT_SIZE(nxip) \
72         (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
73
74 /*
75  * Epoch values from xact.c
76  */
77 typedef struct
78 {
79         TransactionId last_xid;
80         uint32          epoch;
81 } TxidEpoch;
82
83
84 /*
85  * Fetch epoch data from xact.c.
86  */
87 static void
88 load_xid_epoch(TxidEpoch *state)
89 {
90         GetNextXidAndEpoch(&state->last_xid, &state->epoch);
91 }
92
93 /*
94  * do a TransactionId -> txid conversion for an XID near the given epoch
95  */
96 static txid
97 convert_xid(TransactionId xid, const TxidEpoch *state)
98 {
99 #ifndef INT64_IS_BUSTED
100         uint64          epoch;
101
102         /* return special xid's as-is */
103         if (!TransactionIdIsNormal(xid))
104                 return (txid) xid;
105
106         /* xid can be on either side when near wrap-around */
107         epoch = (uint64) state->epoch;
108         if (xid > state->last_xid &&
109                 TransactionIdPrecedes(xid, state->last_xid))
110                 epoch--;
111         else if (xid < state->last_xid &&
112                          TransactionIdFollows(xid, state->last_xid))
113                 epoch++;
114
115         return (epoch << 32) | xid;
116 #else                                                   /* INT64_IS_BUSTED */
117         /* we can't do anything with the epoch, so ignore it */
118         return (txid) xid & MAX_TXID;
119 #endif   /* INT64_IS_BUSTED */
120 }
121
122 /*
123  * txid comparator for qsort/bsearch
124  */
125 static int
126 cmp_txid(const void *aa, const void *bb)
127 {
128         txid            a = *(const txid *) aa;
129         txid            b = *(const txid *) bb;
130
131         if (a < b)
132                 return -1;
133         if (a > b)
134                 return 1;
135         return 0;
136 }
137
138 /*
139  * sort a snapshot's txids, so we can use bsearch() later.
140  *
141  * For consistency of on-disk representation, we always sort even if bsearch
142  * will not be used.
143  */
144 static void
145 sort_snapshot(TxidSnapshot *snap)
146 {
147         if (snap->nxip > 1)
148                 qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
149 }
150
151 /*
152  * check txid visibility.
153  */
154 static bool
155 is_visible_txid(txid value, const TxidSnapshot *snap)
156 {
157         if (value < snap->xmin)
158                 return true;
159         else if (value >= snap->xmax)
160                 return false;
161 #ifdef USE_BSEARCH_IF_NXIP_GREATER
162         else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
163         {
164                 void       *res;
165
166                 res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
167                 /* if found, transaction is still in progress */
168                 return (res) ? false : true;
169         }
170 #endif
171         else
172         {
173                 uint32          i;
174
175                 for (i = 0; i < snap->nxip; i++)
176                 {
177                         if (value == snap->xip[i])
178                                 return false;
179                 }
180                 return true;
181         }
182 }
183
184 /*
185  * helper functions to use StringInfo for TxidSnapshot creation.
186  */
187
188 static StringInfo
189 buf_init(txid xmin, txid xmax)
190 {
191         TxidSnapshot snap;
192         StringInfo      buf;
193
194         snap.xmin = xmin;
195         snap.xmax = xmax;
196         snap.nxip = 0;
197
198         buf = makeStringInfo();
199         appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
200         return buf;
201 }
202
203 static void
204 buf_add_txid(StringInfo buf, txid xid)
205 {
206         TxidSnapshot *snap = (TxidSnapshot *) buf->data;
207
208         /* do this before possible realloc */
209         snap->nxip++;
210
211         appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
212 }
213
214 static TxidSnapshot *
215 buf_finalize(StringInfo buf)
216 {
217         TxidSnapshot *snap = (TxidSnapshot *) buf->data;
218
219         SET_VARSIZE(snap, buf->len);
220
221         /* buf is not needed anymore */
222         buf->data = NULL;
223         pfree(buf);
224
225         return snap;
226 }
227
228 /*
229  * simple number parser.
230  *
231  * We return 0 on error, which is invalid value for txid.
232  */
233 static txid
234 str2txid(const char *s, const char **endp)
235 {
236         txid            val = 0;
237         txid            cutoff = MAX_TXID / 10;
238         txid            cutlim = MAX_TXID % 10;
239
240         for (; *s; s++)
241         {
242                 unsigned        d;
243
244                 if (*s < '0' || *s > '9')
245                         break;
246                 d = *s - '0';
247
248                 /*
249                  * check for overflow
250                  */
251                 if (val > cutoff || (val == cutoff && d > cutlim))
252                 {
253                         val = 0;
254                         break;
255                 }
256
257                 val = val * 10 + d;
258         }
259         if (endp)
260                 *endp = s;
261         return val;
262 }
263
264 /*
265  * parse snapshot from cstring
266  */
267 static TxidSnapshot *
268 parse_snapshot(const char *str)
269 {
270         txid            xmin;
271         txid            xmax;
272         txid            last_val = 0,
273                                 val;
274         const char *str_start = str;
275         const char *endp;
276         StringInfo      buf;
277
278         xmin = str2txid(str, &endp);
279         if (*endp != ':')
280                 goto bad_format;
281         str = endp + 1;
282
283         xmax = str2txid(str, &endp);
284         if (*endp != ':')
285                 goto bad_format;
286         str = endp + 1;
287
288         /* it should look sane */
289         if (xmin == 0 || xmax == 0 || xmin > xmax)
290                 goto bad_format;
291
292         /* allocate buffer */
293         buf = buf_init(xmin, xmax);
294
295         /* loop over values */
296         while (*str != '\0')
297         {
298                 /* read next value */
299                 val = str2txid(str, &endp);
300                 str = endp;
301
302                 /* require the input to be in order */
303                 if (val < xmin || val >= xmax || val <= last_val)
304                         goto bad_format;
305
306                 buf_add_txid(buf, val);
307                 last_val = val;
308
309                 if (*str == ',')
310                         str++;
311                 else if (*str != '\0')
312                         goto bad_format;
313         }
314
315         return buf_finalize(buf);
316
317 bad_format:
318         elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
319         return NULL;
320 }
321
322 /*
323  * Public functions.
324  *
325  * txid_current() and txid_current_snapshot() are the only ones that
326  * communicate with core xid machinery.  All the others work on data
327  * returned by them.
328  */
329
330 /*
331  * txid_current() returns int8
332  *
333  *              Return the current toplevel transaction ID as TXID
334  */
335 Datum
336 txid_current(PG_FUNCTION_ARGS)
337 {
338         txid            val;
339         TxidEpoch       state;
340
341         load_xid_epoch(&state);
342
343         val = convert_xid(GetTopTransactionId(), &state);
344
345         PG_RETURN_INT64(val);
346 }
347
348 /*
349  * txid_current_snapshot() returns txid_snapshot
350  *
351  *              Return current snapshot in TXID format
352  *
353  * Note that only top-transaction XIDs are included in the snapshot.
354  */
355 Datum
356 txid_current_snapshot(PG_FUNCTION_ARGS)
357 {
358         TxidSnapshot *snap;
359         uint32          nxip,
360                                 i,
361                                 size;
362         TxidEpoch       state;
363         Snapshot        cur;
364
365         cur = GetActiveSnapshot();
366         if (cur == NULL)
367                 elog(ERROR, "no active snapshot set");
368
369         load_xid_epoch(&state);
370
371         /* allocate */
372         nxip = cur->xcnt;
373         size = TXID_SNAPSHOT_SIZE(nxip);
374         snap = palloc(size);
375         SET_VARSIZE(snap, size);
376
377         /* fill */
378         snap->xmin = convert_xid(cur->xmin, &state);
379         snap->xmax = convert_xid(cur->xmax, &state);
380         snap->nxip = nxip;
381         for (i = 0; i < nxip; i++)
382                 snap->xip[i] = convert_xid(cur->xip[i], &state);
383
384         /* we want them guaranteed to be in ascending order */
385         sort_snapshot(snap);
386
387         PG_RETURN_POINTER(snap);
388 }
389
390 /*
391  * txid_snapshot_in(cstring) returns txid_snapshot
392  *
393  *              input function for type txid_snapshot
394  */
395 Datum
396 txid_snapshot_in(PG_FUNCTION_ARGS)
397 {
398         char       *str = PG_GETARG_CSTRING(0);
399         TxidSnapshot *snap;
400
401         snap = parse_snapshot(str);
402
403         PG_RETURN_POINTER(snap);
404 }
405
406 /*
407  * txid_snapshot_out(txid_snapshot) returns cstring
408  *
409  *              output function for type txid_snapshot
410  */
411 Datum
412 txid_snapshot_out(PG_FUNCTION_ARGS)
413 {
414         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
415         StringInfoData str;
416         uint32          i;
417
418         initStringInfo(&str);
419
420         appendStringInfo(&str, TXID_FMT ":", snap->xmin);
421         appendStringInfo(&str, TXID_FMT ":", snap->xmax);
422
423         for (i = 0; i < snap->nxip; i++)
424         {
425                 if (i > 0)
426                         appendStringInfoChar(&str, ',');
427                 appendStringInfo(&str, TXID_FMT, snap->xip[i]);
428         }
429
430         PG_RETURN_CSTRING(str.data);
431 }
432
433 /*
434  * txid_snapshot_recv(internal) returns txid_snapshot
435  *
436  *              binary input function for type txid_snapshot
437  *
438  *              format: int4 nxip, int8 xmin, int8 xmax, int8 xip
439  */
440 Datum
441 txid_snapshot_recv(PG_FUNCTION_ARGS)
442 {
443         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
444         TxidSnapshot *snap;
445         txid            last = 0;
446         int                     nxip;
447         int                     i;
448         int                     avail;
449         int                     expect;
450         txid            xmin,
451                                 xmax;
452
453         /*
454          * load nxip and check for nonsense.
455          *
456          * (nxip > avail) check is against int overflows in 'expect'.
457          */
458         nxip = pq_getmsgint(buf, 4);
459         avail = buf->len - buf->cursor;
460         expect = 8 + 8 + nxip * 8;
461         if (nxip < 0 || nxip > avail || expect > avail)
462                 goto bad_format;
463
464         xmin = pq_getmsgint64(buf);
465         xmax = pq_getmsgint64(buf);
466         if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
467                 goto bad_format;
468
469         snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
470         snap->xmin = xmin;
471         snap->xmax = xmax;
472         snap->nxip = nxip;
473         SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
474
475         for (i = 0; i < nxip; i++)
476         {
477                 txid            cur = pq_getmsgint64(buf);
478
479                 if (cur <= last || cur < xmin || cur >= xmax)
480                         goto bad_format;
481                 snap->xip[i] = cur;
482                 last = cur;
483         }
484         PG_RETURN_POINTER(snap);
485
486 bad_format:
487         elog(ERROR, "invalid snapshot data");
488         return (Datum) NULL;
489 }
490
491 /*
492  * txid_snapshot_send(txid_snapshot) returns bytea
493  *
494  *              binary output function for type txid_snapshot
495  *
496  *              format: int4 nxip, int8 xmin, int8 xmax, int8 xip
497  */
498 Datum
499 txid_snapshot_send(PG_FUNCTION_ARGS)
500 {
501         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
502         StringInfoData buf;
503         uint32          i;
504
505         pq_begintypsend(&buf);
506         pq_sendint(&buf, snap->nxip, 4);
507         pq_sendint64(&buf, snap->xmin);
508         pq_sendint64(&buf, snap->xmax);
509         for (i = 0; i < snap->nxip; i++)
510                 pq_sendint64(&buf, snap->xip[i]);
511         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
512 }
513
514 /*
515  * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
516  *
517  *              is txid visible in snapshot ?
518  */
519 Datum
520 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
521 {
522         txid            value = PG_GETARG_INT64(0);
523         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
524
525         PG_RETURN_BOOL(is_visible_txid(value, snap));
526 }
527
528 /*
529  * txid_snapshot_xmin(txid_snapshot) returns int8
530  *
531  *              return snapshot's xmin
532  */
533 Datum
534 txid_snapshot_xmin(PG_FUNCTION_ARGS)
535 {
536         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
537
538         PG_RETURN_INT64(snap->xmin);
539 }
540
541 /*
542  * txid_snapshot_xmax(txid_snapshot) returns int8
543  *
544  *              return snapshot's xmax
545  */
546 Datum
547 txid_snapshot_xmax(PG_FUNCTION_ARGS)
548 {
549         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
550
551         PG_RETURN_INT64(snap->xmax);
552 }
553
554 /*
555  * txid_snapshot_xip(txid_snapshot) returns setof int8
556  *
557  *              return in-progress TXIDs in snapshot.
558  */
559 Datum
560 txid_snapshot_xip(PG_FUNCTION_ARGS)
561 {
562         FuncCallContext *fctx;
563         TxidSnapshot *snap;
564         txid            value;
565
566         /* on first call initialize snap_state and get copy of snapshot */
567         if (SRF_IS_FIRSTCALL())
568         {
569                 TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
570
571                 fctx = SRF_FIRSTCALL_INIT();
572
573                 /* make a copy of user snapshot */
574                 snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
575                 memcpy(snap, arg, VARSIZE(arg));
576
577                 fctx->user_fctx = snap;
578         }
579
580         /* return values one-by-one */
581         fctx = SRF_PERCALL_SETUP();
582         snap = fctx->user_fctx;
583         if (fctx->call_cntr < snap->nxip)
584         {
585                 value = snap->xip[fctx->call_cntr];
586                 SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
587         }
588         else
589         {
590                 SRF_RETURN_DONE(fctx);
591         }
592 }