1 /*-------------------------------------------------------------------------
4 * Export internal transaction IDs to user level.
6 * Note that only top-level transaction IDs are ever converted to TXID.
7 * This is important because TXIDs frequently persist beyond the global
8 * xmin horizon, or may even be shipped to other machines, so we cannot
9 * rely on being able to correlate subtransaction IDs with their parents
10 * via functions such as SubTransGetTopmostTransaction().
13 * Copyright (c) 2003-2007, PostgreSQL Global Development Group
14 * Author: Jan Wieck, Afilias USA INC.
15 * 64-bit txids: Marko Kreen, Skype Technologies
19 *-------------------------------------------------------------------------
24 #include "access/transam.h"
25 #include "access/xact.h"
27 #include "libpq/pqformat.h"
28 #include "utils/builtins.h"
31 #ifndef INT64_IS_BUSTED
32 /* txid will be signed int8 in database, so must limit to 63 bits */
33 #define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)
35 /* we only really have 32 bits to work with :-( */
36 #define MAX_TXID UINT64CONST(0x7FFFFFFF)
39 /* Use unsigned variant internally */
42 /* sprintf format code for uint64 */
43 #define TXID_FMT UINT64_FORMAT
46 * If defined, use bsearch() function for searching for txids in snapshots
47 * that have more than the specified number of values.
49 #define USE_BSEARCH_IF_NXIP_GREATER 30
53 * Snapshot containing 8byte txids.
58 * 4-byte length hdr, should not be touched directly.
60 * Explicit embedding is ok as we want always correct
65 uint32 nxip
; /* number of txids in xip array */
68 txid xip
[1]; /* in-progress txids, xmin <= xip[i] < xmax */
71 #define TXID_SNAPSHOT_SIZE(nxip) \
72 (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
75 * Epoch values from xact.c
79 TransactionId last_xid
;
85 * Fetch epoch data from xact.c.
88 load_xid_epoch(TxidEpoch
*state
)
90 GetNextXidAndEpoch(&state
->last_xid
, &state
->epoch
);
94 * do a TransactionId -> txid conversion for an XID near the given epoch
97 convert_xid(TransactionId xid
, const TxidEpoch
*state
)
99 #ifndef INT64_IS_BUSTED
102 /* return special xid's as-is */
103 if (!TransactionIdIsNormal(xid
))
106 /* xid can be on either side when near wrap-around */
107 epoch
= (uint64
) state
->epoch
;
108 if (xid
> state
->last_xid
&&
109 TransactionIdPrecedes(xid
, state
->last_xid
))
111 else if (xid
< state
->last_xid
&&
112 TransactionIdFollows(xid
, state
->last_xid
))
115 return (epoch
<< 32) | xid
;
116 #else /* INT64_IS_BUSTED */
117 /* we can't do anything with the epoch, so ignore it */
118 return (txid
) xid
& MAX_TXID
;
119 #endif /* INT64_IS_BUSTED */
123 * txid comparator for qsort/bsearch
126 cmp_txid(const void *aa
, const void *bb
)
128 txid a
= *(const txid
*) aa
;
129 txid b
= *(const txid
*) bb
;
139 * sort a snapshot's txids, so we can use bsearch() later.
141 * For consistency of on-disk representation, we always sort even if bsearch
145 sort_snapshot(TxidSnapshot
*snap
)
148 qsort(snap
->xip
, snap
->nxip
, sizeof(txid
), cmp_txid
);
152 * check txid visibility.
155 is_visible_txid(txid value
, const TxidSnapshot
*snap
)
157 if (value
< snap
->xmin
)
159 else if (value
>= snap
->xmax
)
161 #ifdef USE_BSEARCH_IF_NXIP_GREATER
162 else if (snap
->nxip
> USE_BSEARCH_IF_NXIP_GREATER
)
166 res
= bsearch(&value
, snap
->xip
, snap
->nxip
, sizeof(txid
), cmp_txid
);
167 /* if found, transaction is still in progress */
168 return (res
) ? false : true;
175 for (i
= 0; i
< snap
->nxip
; i
++)
177 if (value
== snap
->xip
[i
])
185 * helper functions to use StringInfo for TxidSnapshot creation.
189 buf_init(txid xmin
, txid xmax
)
198 buf
= makeStringInfo();
199 appendBinaryStringInfo(buf
, (char *)&snap
, TXID_SNAPSHOT_SIZE(0));
204 buf_add_txid(StringInfo buf
, txid xid
)
206 TxidSnapshot
*snap
= (TxidSnapshot
*)buf
->data
;
208 /* do this before possible realloc */
211 appendBinaryStringInfo(buf
, (char *)&xid
, sizeof(xid
));
214 static TxidSnapshot
*
215 buf_finalize(StringInfo buf
)
217 TxidSnapshot
*snap
= (TxidSnapshot
*)buf
->data
;
219 SET_VARSIZE(snap
, buf
->len
);
221 /* buf is not needed anymore */
229 * simple number parser.
231 * We return 0 on error, which is invalid value for txid.
234 str2txid(const char *s
, const char **endp
)
237 txid cutoff
= MAX_TXID
/ 10;
238 txid cutlim
= MAX_TXID
% 10;
244 if (*s
< '0' || *s
> '9')
251 if (val
> cutoff
|| (val
== cutoff
&& d
> cutlim
))
265 * parse snapshot from cstring
267 static TxidSnapshot
*
268 parse_snapshot(const char *str
)
272 txid last_val
= 0, val
;
273 const char *str_start
= str
;
277 xmin
= str2txid(str
, &endp
);
282 xmax
= str2txid(str
, &endp
);
287 /* it should look sane */
288 if (xmin
== 0 || xmax
== 0 || xmin
> xmax
)
291 /* allocate buffer */
292 buf
= buf_init(xmin
, xmax
);
294 /* loop over values */
297 /* read next value */
298 val
= str2txid(str
, &endp
);
301 /* require the input to be in order */
302 if (val
< xmin
|| val
>= xmax
|| val
<= last_val
)
305 buf_add_txid(buf
, val
);
310 else if (*str
!= '\0')
314 return buf_finalize(buf
);
317 elog(ERROR
, "invalid input for txid_snapshot: \"%s\"", str_start
);
324 * txid_current() and txid_current_snapshot() are the only ones that
325 * communicate with core xid machinery. All the others work on data
330 * txid_current() returns int8
332 * Return the current toplevel transaction ID as TXID
335 txid_current(PG_FUNCTION_ARGS
)
340 load_xid_epoch(&state
);
342 val
= convert_xid(GetTopTransactionId(), &state
);
344 PG_RETURN_INT64(val
);
348 * txid_current_snapshot() returns txid_snapshot
350 * Return current snapshot in TXID format
352 * Note that only top-transaction XIDs are included in the snapshot.
355 txid_current_snapshot(PG_FUNCTION_ARGS
)
358 uint32 nxip
, i
, size
;
362 cur
= ActiveSnapshot
;
364 elog(ERROR
, "txid_current_snapshot: ActiveSnapshot == NULL");
366 load_xid_epoch(&state
);
370 size
= TXID_SNAPSHOT_SIZE(nxip
);
372 SET_VARSIZE(snap
, size
);
375 snap
->xmin
= convert_xid(cur
->xmin
, &state
);
376 snap
->xmax
= convert_xid(cur
->xmax
, &state
);
378 for (i
= 0; i
< nxip
; i
++)
379 snap
->xip
[i
] = convert_xid(cur
->xip
[i
], &state
);
381 /* we want them guaranteed to be in ascending order */
384 PG_RETURN_POINTER(snap
);
388 * txid_snapshot_in(cstring) returns txid_snapshot
390 * input function for type txid_snapshot
393 txid_snapshot_in(PG_FUNCTION_ARGS
)
395 char *str
= PG_GETARG_CSTRING(0);
398 snap
= parse_snapshot(str
);
400 PG_RETURN_POINTER(snap
);
404 * txid_snapshot_out(txid_snapshot) returns cstring
406 * output function for type txid_snapshot
409 txid_snapshot_out(PG_FUNCTION_ARGS
)
411 TxidSnapshot
*snap
= (TxidSnapshot
*) PG_GETARG_VARLENA_P(0);
415 initStringInfo(&str
);
417 appendStringInfo(&str
, TXID_FMT
":", snap
->xmin
);
418 appendStringInfo(&str
, TXID_FMT
":", snap
->xmax
);
420 for (i
= 0; i
< snap
->nxip
; i
++)
423 appendStringInfoChar(&str
, ',');
424 appendStringInfo(&str
, TXID_FMT
, snap
->xip
[i
]);
427 PG_RETURN_CSTRING(str
.data
);
431 * txid_snapshot_recv(internal) returns txid_snapshot
433 * binary input function for type txid_snapshot
435 * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
438 txid_snapshot_recv(PG_FUNCTION_ARGS
)
440 StringInfo buf
= (StringInfo
) PG_GETARG_POINTER(0);
450 * load nxip and check for nonsense.
452 * (nxip > avail) check is against int overflows in 'expect'.
454 nxip
= pq_getmsgint(buf
, 4);
455 avail
= buf
->len
- buf
->cursor
;
456 expect
= 8 + 8 + nxip
* 8;
457 if (nxip
< 0 || nxip
> avail
|| expect
> avail
)
460 xmin
= pq_getmsgint64(buf
);
461 xmax
= pq_getmsgint64(buf
);
462 if (xmin
== 0 || xmax
== 0 || xmin
> xmax
|| xmax
> MAX_TXID
)
465 snap
= palloc(TXID_SNAPSHOT_SIZE(nxip
));
469 SET_VARSIZE(snap
, TXID_SNAPSHOT_SIZE(nxip
));
471 for (i
= 0; i
< nxip
; i
++)
473 txid cur
= pq_getmsgint64(buf
);
474 if (cur
<= last
|| cur
< xmin
|| cur
>= xmax
)
479 PG_RETURN_POINTER(snap
);
482 elog(ERROR
, "invalid snapshot data");
487 * txid_snapshot_send(txid_snapshot) returns bytea
489 * binary output function for type txid_snapshot
491 * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
494 txid_snapshot_send(PG_FUNCTION_ARGS
)
496 TxidSnapshot
*snap
= (TxidSnapshot
*)PG_GETARG_VARLENA_P(0);
500 pq_begintypsend(&buf
);
501 pq_sendint(&buf
, snap
->nxip
, 4);
502 pq_sendint64(&buf
, snap
->xmin
);
503 pq_sendint64(&buf
, snap
->xmax
);
504 for (i
= 0; i
< snap
->nxip
; i
++)
505 pq_sendint64(&buf
, snap
->xip
[i
]);
506 PG_RETURN_BYTEA_P(pq_endtypsend(&buf
));
510 * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
512 * is txid visible in snapshot ?
515 txid_visible_in_snapshot(PG_FUNCTION_ARGS
)
517 txid value
= PG_GETARG_INT64(0);
518 TxidSnapshot
*snap
= (TxidSnapshot
*) PG_GETARG_VARLENA_P(1);
520 PG_RETURN_BOOL(is_visible_txid(value
, snap
));
524 * txid_snapshot_xmin(txid_snapshot) returns int8
526 * return snapshot's xmin
529 txid_snapshot_xmin(PG_FUNCTION_ARGS
)
531 TxidSnapshot
*snap
= (TxidSnapshot
*) PG_GETARG_VARLENA_P(0);
533 PG_RETURN_INT64(snap
->xmin
);
537 * txid_snapshot_xmax(txid_snapshot) returns int8
539 * return snapshot's xmax
542 txid_snapshot_xmax(PG_FUNCTION_ARGS
)
544 TxidSnapshot
*snap
= (TxidSnapshot
*) PG_GETARG_VARLENA_P(0);
546 PG_RETURN_INT64(snap
->xmax
);
550 * txid_snapshot_xip(txid_snapshot) returns setof int8
552 * return in-progress TXIDs in snapshot.
555 txid_snapshot_xip(PG_FUNCTION_ARGS
)
557 FuncCallContext
*fctx
;
561 /* on first call initialize snap_state and get copy of snapshot */
562 if (SRF_IS_FIRSTCALL()) {
563 TxidSnapshot
*arg
= (TxidSnapshot
*) PG_GETARG_VARLENA_P(0);
565 fctx
= SRF_FIRSTCALL_INIT();
567 /* make a copy of user snapshot */
568 snap
= MemoryContextAlloc(fctx
->multi_call_memory_ctx
, VARSIZE(arg
));
569 memcpy(snap
, arg
, VARSIZE(arg
));
571 fctx
->user_fctx
= snap
;
574 /* return values one-by-one */
575 fctx
= SRF_PERCALL_SETUP();
576 snap
= fctx
->user_fctx
;
577 if (fctx
->call_cntr
< snap
->nxip
) {
578 value
= snap
->xip
[fctx
->call_cntr
];
579 SRF_RETURN_NEXT(fctx
, Int64GetDatum(value
));
581 SRF_RETURN_DONE(fctx
);