]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/txid.c
Define integer limits independently from the system definitions.
[postgresql] / src / backend / utils / adt / txid.c
1 /*-------------------------------------------------------------------------
2  * txid.c
3  *
4  *      Export internal transaction IDs to user level.
5  *
6  * Note that only top-level transaction IDs are ever converted to TXID.
7  * This is important because TXIDs frequently persist beyond the global
8  * xmin horizon, or may even be shipped to other machines, so we cannot
9  * rely on being able to correlate subtransaction IDs with their parents
10  * via functions such as SubTransGetTopmostTransaction().
11  *
12  *
13  *      Copyright (c) 2003-2015, PostgreSQL Global Development Group
14  *      Author: Jan Wieck, Afilias USA INC.
15  *      64-bit txids: Marko Kreen, Skype Technologies
16  *
17  *      src/backend/utils/adt/txid.c
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include "access/transam.h"
25 #include "access/xact.h"
26 #include "access/xlog.h"
27 #include "funcapi.h"
28 #include "miscadmin.h"
29 #include "libpq/pqformat.h"
30 #include "postmaster/postmaster.h"
31 #include "utils/builtins.h"
32 #include "utils/memutils.h"
33 #include "utils/snapmgr.h"
34
35
36 /* txid will be signed int8 in database, so must limit to 63 bits */
37 #define MAX_TXID   ((uint64) PG_INT64_MAX)
38
39 /* Use unsigned variant internally */
40 typedef uint64 txid;
41
42 /* sprintf format code for uint64 */
43 #define TXID_FMT UINT64_FORMAT
44
45 /*
46  * If defined, use bsearch() function for searching for txids in snapshots
47  * that have more than the specified number of values.
48  */
49 #define USE_BSEARCH_IF_NXIP_GREATER 30
50
51
52 /*
53  * Snapshot containing 8byte txids.
54  */
55 typedef struct
56 {
57         /*
58          * 4-byte length hdr, should not be touched directly.
59          *
60          * Explicit embedding is ok as we want always correct alignment anyway.
61          */
62         int32           __varsz;
63
64         uint32          nxip;                   /* number of txids in xip array */
65         txid            xmin;
66         txid            xmax;
67         /* in-progress txids, xmin <= xip[i] < xmax: */
68         txid            xip[FLEXIBLE_ARRAY_MEMBER];
69 } TxidSnapshot;
70
71 #define TXID_SNAPSHOT_SIZE(nxip) \
72         (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
73 #define TXID_SNAPSHOT_MAX_NXIP \
74         ((MaxAllocSize - offsetof(TxidSnapshot, xip)) / sizeof(txid))
75
76 /*
77  * Epoch values from xact.c
78  */
79 typedef struct
80 {
81         TransactionId last_xid;
82         uint32          epoch;
83 } TxidEpoch;
84
85
86 /*
87  * Fetch epoch data from xact.c.
88  */
89 static void
90 load_xid_epoch(TxidEpoch *state)
91 {
92         GetNextXidAndEpoch(&state->last_xid, &state->epoch);
93 }
94
95 /*
96  * do a TransactionId -> txid conversion for an XID near the given epoch
97  */
98 static txid
99 convert_xid(TransactionId xid, const TxidEpoch *state)
100 {
101         uint64          epoch;
102
103         /* return special xid's as-is */
104         if (!TransactionIdIsNormal(xid))
105                 return (txid) xid;
106
107         /* xid can be on either side when near wrap-around */
108         epoch = (uint64) state->epoch;
109         if (xid > state->last_xid &&
110                 TransactionIdPrecedes(xid, state->last_xid))
111                 epoch--;
112         else if (xid < state->last_xid &&
113                          TransactionIdFollows(xid, state->last_xid))
114                 epoch++;
115
116         return (epoch << 32) | xid;
117 }
118
119 /*
120  * txid comparator for qsort/bsearch
121  */
122 static int
123 cmp_txid(const void *aa, const void *bb)
124 {
125         txid            a = *(const txid *) aa;
126         txid            b = *(const txid *) bb;
127
128         if (a < b)
129                 return -1;
130         if (a > b)
131                 return 1;
132         return 0;
133 }
134
135 /*
136  * Sort a snapshot's txids, so we can use bsearch() later.  Also remove
137  * any duplicates.
138  *
139  * For consistency of on-disk representation, we always sort even if bsearch
140  * will not be used.
141  */
142 static void
143 sort_snapshot(TxidSnapshot *snap)
144 {
145         txid    last = 0;
146         int             nxip, idx1, idx2;
147
148         if (snap->nxip > 1)
149         {
150                 qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
151
152                 /* remove duplicates */
153                 nxip = snap->nxip;
154                 idx1 = idx2 = 0;
155                 while (idx1 < nxip)
156                 {
157                         if (snap->xip[idx1] != last)
158                                 last = snap->xip[idx2++] = snap->xip[idx1];
159                         else
160                                 snap->nxip--;
161                         idx1++;
162                 }
163         }
164 }
165
166 /*
167  * check txid visibility.
168  */
169 static bool
170 is_visible_txid(txid value, const TxidSnapshot *snap)
171 {
172         if (value < snap->xmin)
173                 return true;
174         else if (value >= snap->xmax)
175                 return false;
176 #ifdef USE_BSEARCH_IF_NXIP_GREATER
177         else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
178         {
179                 void       *res;
180
181                 res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
182                 /* if found, transaction is still in progress */
183                 return (res) ? false : true;
184         }
185 #endif
186         else
187         {
188                 uint32          i;
189
190                 for (i = 0; i < snap->nxip; i++)
191                 {
192                         if (value == snap->xip[i])
193                                 return false;
194                 }
195                 return true;
196         }
197 }
198
199 /*
200  * helper functions to use StringInfo for TxidSnapshot creation.
201  */
202
203 static StringInfo
204 buf_init(txid xmin, txid xmax)
205 {
206         TxidSnapshot snap;
207         StringInfo      buf;
208
209         snap.xmin = xmin;
210         snap.xmax = xmax;
211         snap.nxip = 0;
212
213         buf = makeStringInfo();
214         appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
215         return buf;
216 }
217
218 static void
219 buf_add_txid(StringInfo buf, txid xid)
220 {
221         TxidSnapshot *snap = (TxidSnapshot *) buf->data;
222
223         /* do this before possible realloc */
224         snap->nxip++;
225
226         appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
227 }
228
229 static TxidSnapshot *
230 buf_finalize(StringInfo buf)
231 {
232         TxidSnapshot *snap = (TxidSnapshot *) buf->data;
233
234         SET_VARSIZE(snap, buf->len);
235
236         /* buf is not needed anymore */
237         buf->data = NULL;
238         pfree(buf);
239
240         return snap;
241 }
242
243 /*
244  * simple number parser.
245  *
246  * We return 0 on error, which is invalid value for txid.
247  */
248 static txid
249 str2txid(const char *s, const char **endp)
250 {
251         txid            val = 0;
252         txid            cutoff = MAX_TXID / 10;
253         txid            cutlim = MAX_TXID % 10;
254
255         for (; *s; s++)
256         {
257                 unsigned        d;
258
259                 if (*s < '0' || *s > '9')
260                         break;
261                 d = *s - '0';
262
263                 /*
264                  * check for overflow
265                  */
266                 if (val > cutoff || (val == cutoff && d > cutlim))
267                 {
268                         val = 0;
269                         break;
270                 }
271
272                 val = val * 10 + d;
273         }
274         if (endp)
275                 *endp = s;
276         return val;
277 }
278
279 /*
280  * parse snapshot from cstring
281  */
282 static TxidSnapshot *
283 parse_snapshot(const char *str)
284 {
285         txid            xmin;
286         txid            xmax;
287         txid            last_val = 0,
288                                 val;
289         const char *str_start = str;
290         const char *endp;
291         StringInfo      buf;
292
293         xmin = str2txid(str, &endp);
294         if (*endp != ':')
295                 goto bad_format;
296         str = endp + 1;
297
298         xmax = str2txid(str, &endp);
299         if (*endp != ':')
300                 goto bad_format;
301         str = endp + 1;
302
303         /* it should look sane */
304         if (xmin == 0 || xmax == 0 || xmin > xmax)
305                 goto bad_format;
306
307         /* allocate buffer */
308         buf = buf_init(xmin, xmax);
309
310         /* loop over values */
311         while (*str != '\0')
312         {
313                 /* read next value */
314                 val = str2txid(str, &endp);
315                 str = endp;
316
317                 /* require the input to be in order */
318                 if (val < xmin || val >= xmax || val < last_val)
319                         goto bad_format;
320
321                 /* skip duplicates */
322                 if (val != last_val)
323                         buf_add_txid(buf, val);
324                 last_val = val;
325
326                 if (*str == ',')
327                         str++;
328                 else if (*str != '\0')
329                         goto bad_format;
330         }
331
332         return buf_finalize(buf);
333
334 bad_format:
335         elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
336         return NULL;
337 }
338
339 /*
340  * Public functions.
341  *
342  * txid_current() and txid_current_snapshot() are the only ones that
343  * communicate with core xid machinery.  All the others work on data
344  * returned by them.
345  */
346
347 /*
348  * txid_current() returns int8
349  *
350  *      Return the current toplevel transaction ID as TXID
351  *      If the current transaction does not have one, one is assigned.
352  */
353 Datum
354 txid_current(PG_FUNCTION_ARGS)
355 {
356         txid            val;
357         TxidEpoch       state;
358
359         /*
360          * Must prevent during recovery because if an xid is not assigned we try
361          * to assign one, which would fail. Programs already rely on this function
362          * to always return a valid current xid, so we should not change this to
363          * return NULL or similar invalid xid.
364          */
365         PreventCommandDuringRecovery("txid_current()");
366
367         load_xid_epoch(&state);
368
369         val = convert_xid(GetTopTransactionId(), &state);
370
371         PG_RETURN_INT64(val);
372 }
373
374 /*
375  * txid_current_snapshot() returns txid_snapshot
376  *
377  *              Return current snapshot in TXID format
378  *
379  * Note that only top-transaction XIDs are included in the snapshot.
380  */
381 Datum
382 txid_current_snapshot(PG_FUNCTION_ARGS)
383 {
384         TxidSnapshot *snap;
385         uint32          nxip,
386                                 i;
387         TxidEpoch       state;
388         Snapshot        cur;
389
390         cur = GetActiveSnapshot();
391         if (cur == NULL)
392                 elog(ERROR, "no active snapshot set");
393
394         load_xid_epoch(&state);
395
396         /*
397          * Compile-time limits on the procarray (MAX_BACKENDS processes plus
398          * MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
399          */
400         StaticAssertStmt(MAX_BACKENDS * 2 <= TXID_SNAPSHOT_MAX_NXIP,
401                                          "possible overflow in txid_current_snapshot()");
402
403         /* allocate */
404         nxip = cur->xcnt;
405         snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
406
407         /* fill */
408         snap->xmin = convert_xid(cur->xmin, &state);
409         snap->xmax = convert_xid(cur->xmax, &state);
410         snap->nxip = nxip;
411         for (i = 0; i < nxip; i++)
412                 snap->xip[i] = convert_xid(cur->xip[i], &state);
413
414         /*
415          * We want them guaranteed to be in ascending order.  This also removes
416          * any duplicate xids.  Normally, an XID can only be assigned to one
417          * backend, but when preparing a transaction for two-phase commit, there
418          * is a transient state when both the original backend and the dummy
419          * PGPROC entry reserved for the prepared transaction hold the same XID.
420          */
421         sort_snapshot(snap);
422
423         /* set size after sorting, because it may have removed duplicate xips */
424         SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(snap->nxip));
425
426         PG_RETURN_POINTER(snap);
427 }
428
429 /*
430  * txid_snapshot_in(cstring) returns txid_snapshot
431  *
432  *              input function for type txid_snapshot
433  */
434 Datum
435 txid_snapshot_in(PG_FUNCTION_ARGS)
436 {
437         char       *str = PG_GETARG_CSTRING(0);
438         TxidSnapshot *snap;
439
440         snap = parse_snapshot(str);
441
442         PG_RETURN_POINTER(snap);
443 }
444
445 /*
446  * txid_snapshot_out(txid_snapshot) returns cstring
447  *
448  *              output function for type txid_snapshot
449  */
450 Datum
451 txid_snapshot_out(PG_FUNCTION_ARGS)
452 {
453         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
454         StringInfoData str;
455         uint32          i;
456
457         initStringInfo(&str);
458
459         appendStringInfo(&str, TXID_FMT ":", snap->xmin);
460         appendStringInfo(&str, TXID_FMT ":", snap->xmax);
461
462         for (i = 0; i < snap->nxip; i++)
463         {
464                 if (i > 0)
465                         appendStringInfoChar(&str, ',');
466                 appendStringInfo(&str, TXID_FMT, snap->xip[i]);
467         }
468
469         PG_RETURN_CSTRING(str.data);
470 }
471
472 /*
473  * txid_snapshot_recv(internal) returns txid_snapshot
474  *
475  *              binary input function for type txid_snapshot
476  *
477  *              format: int4 nxip, int8 xmin, int8 xmax, int8 xip
478  */
479 Datum
480 txid_snapshot_recv(PG_FUNCTION_ARGS)
481 {
482         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
483         TxidSnapshot *snap;
484         txid            last = 0;
485         int                     nxip;
486         int                     i;
487         txid            xmin,
488                                 xmax;
489
490         /* load and validate nxip */
491         nxip = pq_getmsgint(buf, 4);
492         if (nxip < 0 || nxip > TXID_SNAPSHOT_MAX_NXIP)
493                 goto bad_format;
494
495         xmin = pq_getmsgint64(buf);
496         xmax = pq_getmsgint64(buf);
497         if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
498                 goto bad_format;
499
500         snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
501         snap->xmin = xmin;
502         snap->xmax = xmax;
503
504         for (i = 0; i < nxip; i++)
505         {
506                 txid            cur = pq_getmsgint64(buf);
507
508                 if (cur < last || cur < xmin || cur >= xmax)
509                         goto bad_format;
510
511                 /* skip duplicate xips */
512                 if (cur == last)
513                 {
514                         i--;
515                         nxip--;
516                         continue;
517                 }
518
519                 snap->xip[i] = cur;
520                 last = cur;
521         }
522         snap->nxip = nxip;
523         SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
524         PG_RETURN_POINTER(snap);
525
526 bad_format:
527         elog(ERROR, "invalid snapshot data");
528         return (Datum) NULL;
529 }
530
531 /*
532  * txid_snapshot_send(txid_snapshot) returns bytea
533  *
534  *              binary output function for type txid_snapshot
535  *
536  *              format: int4 nxip, int8 xmin, int8 xmax, int8 xip
537  */
538 Datum
539 txid_snapshot_send(PG_FUNCTION_ARGS)
540 {
541         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
542         StringInfoData buf;
543         uint32          i;
544
545         pq_begintypsend(&buf);
546         pq_sendint(&buf, snap->nxip, 4);
547         pq_sendint64(&buf, snap->xmin);
548         pq_sendint64(&buf, snap->xmax);
549         for (i = 0; i < snap->nxip; i++)
550                 pq_sendint64(&buf, snap->xip[i]);
551         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
552 }
553
554 /*
555  * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
556  *
557  *              is txid visible in snapshot ?
558  */
559 Datum
560 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
561 {
562         txid            value = PG_GETARG_INT64(0);
563         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
564
565         PG_RETURN_BOOL(is_visible_txid(value, snap));
566 }
567
568 /*
569  * txid_snapshot_xmin(txid_snapshot) returns int8
570  *
571  *              return snapshot's xmin
572  */
573 Datum
574 txid_snapshot_xmin(PG_FUNCTION_ARGS)
575 {
576         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
577
578         PG_RETURN_INT64(snap->xmin);
579 }
580
581 /*
582  * txid_snapshot_xmax(txid_snapshot) returns int8
583  *
584  *              return snapshot's xmax
585  */
586 Datum
587 txid_snapshot_xmax(PG_FUNCTION_ARGS)
588 {
589         TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
590
591         PG_RETURN_INT64(snap->xmax);
592 }
593
594 /*
595  * txid_snapshot_xip(txid_snapshot) returns setof int8
596  *
597  *              return in-progress TXIDs in snapshot.
598  */
599 Datum
600 txid_snapshot_xip(PG_FUNCTION_ARGS)
601 {
602         FuncCallContext *fctx;
603         TxidSnapshot *snap;
604         txid            value;
605
606         /* on first call initialize snap_state and get copy of snapshot */
607         if (SRF_IS_FIRSTCALL())
608         {
609                 TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
610
611                 fctx = SRF_FIRSTCALL_INIT();
612
613                 /* make a copy of user snapshot */
614                 snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
615                 memcpy(snap, arg, VARSIZE(arg));
616
617                 fctx->user_fctx = snap;
618         }
619
620         /* return values one-by-one */
621         fctx = SRF_PERCALL_SETUP();
622         snap = fctx->user_fctx;
623         if (fctx->call_cntr < snap->nxip)
624         {
625                 value = snap->xip[fctx->call_cntr];
626                 SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
627         }
628         else
629         {
630                 SRF_RETURN_DONE(fctx);
631         }
632 }