Migrate the former contrib/txid module into core. This will make it easier
[PostgreSQL.git] / src / backend / utils / adt / txid.c
blobf63d654f7bf2c4c4af782fcaad9c6818d0c56640
1 /*-------------------------------------------------------------------------
2 * txid.c
4 * Export internal transaction IDs to user level.
6 * Note that only top-level transaction IDs are ever converted to TXID.
7 * This is important because TXIDs frequently persist beyond the global
8 * xmin horizon, or may even be shipped to other machines, so we cannot
9 * rely on being able to correlate subtransaction IDs with their parents
10 * via functions such as SubTransGetTopmostTransaction().
13 * Copyright (c) 2003-2007, PostgreSQL Global Development Group
14 * Author: Jan Wieck, Afilias USA INC.
15 * 64-bit txids: Marko Kreen, Skype Technologies
17 * $PostgreSQL$
19 *-------------------------------------------------------------------------
22 #include "postgres.h"
24 #include "access/transam.h"
25 #include "access/xact.h"
26 #include "funcapi.h"
27 #include "libpq/pqformat.h"
28 #include "utils/builtins.h"
31 #ifndef INT64_IS_BUSTED
32 /* txid will be signed int8 in database, so must limit to 63 bits */
33 #define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)
34 #else
35 /* we only really have 32 bits to work with :-( */
36 #define MAX_TXID UINT64CONST(0x7FFFFFFF)
37 #endif
39 /* Use unsigned variant internally */
40 typedef uint64 txid;
42 /* sprintf format code for uint64 */
43 #define TXID_FMT UINT64_FORMAT
46 * If defined, use bsearch() function for searching for txids in snapshots
47 * that have more than the specified number of values.
49 #define USE_BSEARCH_IF_NXIP_GREATER 30
53 * Snapshot containing 8byte txids.
55 typedef struct
58 * 4-byte length hdr, should not be touched directly.
60 * Explicit embedding is ok as we want always correct
61 * alignment anyway.
63 int32 __varsz;
65 uint32 nxip; /* number of txids in xip array */
66 txid xmin;
67 txid xmax;
68 txid xip[1]; /* in-progress txids, xmin <= xip[i] < xmax */
69 } TxidSnapshot;
71 #define TXID_SNAPSHOT_SIZE(nxip) \
72 (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
75 * Epoch values from xact.c
77 typedef struct
79 TransactionId last_xid;
80 uint32 epoch;
81 } TxidEpoch;
85 * Fetch epoch data from xact.c.
87 static void
88 load_xid_epoch(TxidEpoch *state)
90 GetNextXidAndEpoch(&state->last_xid, &state->epoch);
94 * do a TransactionId -> txid conversion for an XID near the given epoch
96 static txid
97 convert_xid(TransactionId xid, const TxidEpoch *state)
99 #ifndef INT64_IS_BUSTED
100 uint64 epoch;
102 /* return special xid's as-is */
103 if (!TransactionIdIsNormal(xid))
104 return (txid) xid;
106 /* xid can be on either side when near wrap-around */
107 epoch = (uint64) state->epoch;
108 if (xid > state->last_xid &&
109 TransactionIdPrecedes(xid, state->last_xid))
110 epoch--;
111 else if (xid < state->last_xid &&
112 TransactionIdFollows(xid, state->last_xid))
113 epoch++;
115 return (epoch << 32) | xid;
116 #else /* INT64_IS_BUSTED */
117 /* we can't do anything with the epoch, so ignore it */
118 return (txid) xid & MAX_TXID;
119 #endif /* INT64_IS_BUSTED */
123 * txid comparator for qsort/bsearch
125 static int
126 cmp_txid(const void *aa, const void *bb)
128 txid a = *(const txid *) aa;
129 txid b = *(const txid *) bb;
131 if (a < b)
132 return -1;
133 if (a > b)
134 return 1;
135 return 0;
139 * sort a snapshot's txids, so we can use bsearch() later.
141 * For consistency of on-disk representation, we always sort even if bsearch
142 * will not be used.
144 static void
145 sort_snapshot(TxidSnapshot *snap)
147 if (snap->nxip > 1)
148 qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
152 * check txid visibility.
154 static bool
155 is_visible_txid(txid value, const TxidSnapshot *snap)
157 if (value < snap->xmin)
158 return true;
159 else if (value >= snap->xmax)
160 return false;
161 #ifdef USE_BSEARCH_IF_NXIP_GREATER
162 else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
164 void *res;
166 res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
167 /* if found, transaction is still in progress */
168 return (res) ? false : true;
170 #endif
171 else
173 uint32 i;
175 for (i = 0; i < snap->nxip; i++)
177 if (value == snap->xip[i])
178 return false;
180 return true;
185 * helper functions to use StringInfo for TxidSnapshot creation.
188 static StringInfo
189 buf_init(txid xmin, txid xmax)
191 TxidSnapshot snap;
192 StringInfo buf;
194 snap.xmin = xmin;
195 snap.xmax = xmax;
196 snap.nxip = 0;
198 buf = makeStringInfo();
199 appendBinaryStringInfo(buf, (char *)&snap, TXID_SNAPSHOT_SIZE(0));
200 return buf;
203 static void
204 buf_add_txid(StringInfo buf, txid xid)
206 TxidSnapshot *snap = (TxidSnapshot *)buf->data;
208 /* do this before possible realloc */
209 snap->nxip++;
211 appendBinaryStringInfo(buf, (char *)&xid, sizeof(xid));
214 static TxidSnapshot *
215 buf_finalize(StringInfo buf)
217 TxidSnapshot *snap = (TxidSnapshot *)buf->data;
219 SET_VARSIZE(snap, buf->len);
221 /* buf is not needed anymore */
222 buf->data = NULL;
223 pfree(buf);
225 return snap;
229 * simple number parser.
231 * We return 0 on error, which is invalid value for txid.
233 static txid
234 str2txid(const char *s, const char **endp)
236 txid val = 0;
237 txid cutoff = MAX_TXID / 10;
238 txid cutlim = MAX_TXID % 10;
240 for (; *s; s++)
242 unsigned d;
244 if (*s < '0' || *s > '9')
245 break;
246 d = *s - '0';
249 * check for overflow
251 if (val > cutoff || (val == cutoff && d > cutlim))
253 val = 0;
254 break;
257 val = val * 10 + d;
259 if (endp)
260 *endp = s;
261 return val;
265 * parse snapshot from cstring
267 static TxidSnapshot *
268 parse_snapshot(const char *str)
270 txid xmin;
271 txid xmax;
272 txid last_val = 0, val;
273 const char *str_start = str;
274 const char *endp;
275 StringInfo buf;
277 xmin = str2txid(str, &endp);
278 if (*endp != ':')
279 goto bad_format;
280 str = endp + 1;
282 xmax = str2txid(str, &endp);
283 if (*endp != ':')
284 goto bad_format;
285 str = endp + 1;
287 /* it should look sane */
288 if (xmin == 0 || xmax == 0 || xmin > xmax)
289 goto bad_format;
291 /* allocate buffer */
292 buf = buf_init(xmin, xmax);
294 /* loop over values */
295 while (*str != '\0')
297 /* read next value */
298 val = str2txid(str, &endp);
299 str = endp;
301 /* require the input to be in order */
302 if (val < xmin || val >= xmax || val <= last_val)
303 goto bad_format;
305 buf_add_txid(buf, val);
306 last_val = val;
308 if (*str == ',')
309 str++;
310 else if (*str != '\0')
311 goto bad_format;
314 return buf_finalize(buf);
316 bad_format:
317 elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
318 return NULL;
322 * Public functions.
324 * txid_current() and txid_current_snapshot() are the only ones that
325 * communicate with core xid machinery. All the others work on data
326 * returned by them.
330 * txid_current() returns int8
332 * Return the current toplevel transaction ID as TXID
334 Datum
335 txid_current(PG_FUNCTION_ARGS)
337 txid val;
338 TxidEpoch state;
340 load_xid_epoch(&state);
342 val = convert_xid(GetTopTransactionId(), &state);
344 PG_RETURN_INT64(val);
348 * txid_current_snapshot() returns txid_snapshot
350 * Return current snapshot in TXID format
352 * Note that only top-transaction XIDs are included in the snapshot.
354 Datum
355 txid_current_snapshot(PG_FUNCTION_ARGS)
357 TxidSnapshot *snap;
358 uint32 nxip, i, size;
359 TxidEpoch state;
360 Snapshot cur;
362 cur = ActiveSnapshot;
363 if (cur == NULL)
364 elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL");
366 load_xid_epoch(&state);
368 /* allocate */
369 nxip = cur->xcnt;
370 size = TXID_SNAPSHOT_SIZE(nxip);
371 snap = palloc(size);
372 SET_VARSIZE(snap, size);
374 /* fill */
375 snap->xmin = convert_xid(cur->xmin, &state);
376 snap->xmax = convert_xid(cur->xmax, &state);
377 snap->nxip = nxip;
378 for (i = 0; i < nxip; i++)
379 snap->xip[i] = convert_xid(cur->xip[i], &state);
381 /* we want them guaranteed to be in ascending order */
382 sort_snapshot(snap);
384 PG_RETURN_POINTER(snap);
388 * txid_snapshot_in(cstring) returns txid_snapshot
390 * input function for type txid_snapshot
392 Datum
393 txid_snapshot_in(PG_FUNCTION_ARGS)
395 char *str = PG_GETARG_CSTRING(0);
396 TxidSnapshot *snap;
398 snap = parse_snapshot(str);
400 PG_RETURN_POINTER(snap);
404 * txid_snapshot_out(txid_snapshot) returns cstring
406 * output function for type txid_snapshot
408 Datum
409 txid_snapshot_out(PG_FUNCTION_ARGS)
411 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
412 StringInfoData str;
413 uint32 i;
415 initStringInfo(&str);
417 appendStringInfo(&str, TXID_FMT ":", snap->xmin);
418 appendStringInfo(&str, TXID_FMT ":", snap->xmax);
420 for (i = 0; i < snap->nxip; i++)
422 if (i > 0)
423 appendStringInfoChar(&str, ',');
424 appendStringInfo(&str, TXID_FMT, snap->xip[i]);
427 PG_RETURN_CSTRING(str.data);
431 * txid_snapshot_recv(internal) returns txid_snapshot
433 * binary input function for type txid_snapshot
435 * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
437 Datum
438 txid_snapshot_recv(PG_FUNCTION_ARGS)
440 StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
441 TxidSnapshot *snap;
442 txid last = 0;
443 int nxip;
444 int i;
445 int avail;
446 int expect;
447 txid xmin, xmax;
450 * load nxip and check for nonsense.
452 * (nxip > avail) check is against int overflows in 'expect'.
454 nxip = pq_getmsgint(buf, 4);
455 avail = buf->len - buf->cursor;
456 expect = 8 + 8 + nxip * 8;
457 if (nxip < 0 || nxip > avail || expect > avail)
458 goto bad_format;
460 xmin = pq_getmsgint64(buf);
461 xmax = pq_getmsgint64(buf);
462 if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
463 goto bad_format;
465 snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
466 snap->xmin = xmin;
467 snap->xmax = xmax;
468 snap->nxip = nxip;
469 SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
471 for (i = 0; i < nxip; i++)
473 txid cur = pq_getmsgint64(buf);
474 if (cur <= last || cur < xmin || cur >= xmax)
475 goto bad_format;
476 snap->xip[i] = cur;
477 last = cur;
479 PG_RETURN_POINTER(snap);
481 bad_format:
482 elog(ERROR, "invalid snapshot data");
483 return (Datum)NULL;
487 * txid_snapshot_send(txid_snapshot) returns bytea
489 * binary output function for type txid_snapshot
491 * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
493 Datum
494 txid_snapshot_send(PG_FUNCTION_ARGS)
496 TxidSnapshot *snap = (TxidSnapshot *)PG_GETARG_VARLENA_P(0);
497 StringInfoData buf;
498 uint32 i;
500 pq_begintypsend(&buf);
501 pq_sendint(&buf, snap->nxip, 4);
502 pq_sendint64(&buf, snap->xmin);
503 pq_sendint64(&buf, snap->xmax);
504 for (i = 0; i < snap->nxip; i++)
505 pq_sendint64(&buf, snap->xip[i]);
506 PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
510 * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
512 * is txid visible in snapshot ?
514 Datum
515 txid_visible_in_snapshot(PG_FUNCTION_ARGS)
517 txid value = PG_GETARG_INT64(0);
518 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
520 PG_RETURN_BOOL(is_visible_txid(value, snap));
524 * txid_snapshot_xmin(txid_snapshot) returns int8
526 * return snapshot's xmin
528 Datum
529 txid_snapshot_xmin(PG_FUNCTION_ARGS)
531 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
533 PG_RETURN_INT64(snap->xmin);
537 * txid_snapshot_xmax(txid_snapshot) returns int8
539 * return snapshot's xmax
541 Datum
542 txid_snapshot_xmax(PG_FUNCTION_ARGS)
544 TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
546 PG_RETURN_INT64(snap->xmax);
550 * txid_snapshot_xip(txid_snapshot) returns setof int8
552 * return in-progress TXIDs in snapshot.
554 Datum
555 txid_snapshot_xip(PG_FUNCTION_ARGS)
557 FuncCallContext *fctx;
558 TxidSnapshot *snap;
559 txid value;
561 /* on first call initialize snap_state and get copy of snapshot */
562 if (SRF_IS_FIRSTCALL()) {
563 TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
565 fctx = SRF_FIRSTCALL_INIT();
567 /* make a copy of user snapshot */
568 snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
569 memcpy(snap, arg, VARSIZE(arg));
571 fctx->user_fctx = snap;
574 /* return values one-by-one */
575 fctx = SRF_PERCALL_SETUP();
576 snap = fctx->user_fctx;
577 if (fctx->call_cntr < snap->nxip) {
578 value = snap->xip[fctx->call_cntr];
579 SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
580 } else {
581 SRF_RETURN_DONE(fctx);