1 /*-------------------------------------------------------------------------
4 * Two-phase commit support functions.
6 * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/access/transam/twophase.c
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
24 * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 * the XID considered running by TransactionIdIsInProgress. It is also
26 * convenient as a PGPROC to hook the gxact's locks to.
28 * Information to recover prepared transactions in case of crash is
29 * now stored in WAL for the common case. In some cases there will be
30 * an extended period between preparing a GXACT and commit/abort, in
31 * which case we need to separately record prepared transaction data
32 * in permanent storage. This includes locking information, pending
33 * notifications etc. All that state information is written to the
34 * per-transaction state file in the pg_twophase directory.
35 * All prepared transactions will be written prior to shutdown.
37 * Life track of state data is following:
39 * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 * stores pointer to the start of the WAL record in
41 * gxact->prepare_start_lsn.
42 * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 * using prepare_start_lsn.
44 * * On checkpoint state data copied to files in pg_twophase directory and
46 * * If COMMIT happens after checkpoint then backend reads state data from
49 * During replay and replication, TwoPhaseState also holds information
50 * about active prepared transactions that haven't been moved to disk yet.
52 * Replay of twophase records happens by the following rules:
54 * * At the beginning of recovery, pg_twophase is scanned once, filling
55 * TwoPhaseState with entries marked with gxact->inredo and
56 * gxact->ondisk. Two-phase file data older than the XID horizon of
57 * the redo position are discarded.
58 * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 * gxact->inredo is set to true for such entries.
60 * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 * that have gxact->inredo set and are behind the redo_horizon. We
62 * save them to disk and then switch gxact->ondisk to true.
63 * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 * If gxact->ondisk is true, the corresponding entry from the disk
65 * is additionally deleted.
66 * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 * and PrescanPreparedTransactions() have been modified to go through
68 * gxact->inredo entries that have not made it to disk.
70 *-------------------------------------------------------------------------
79 #include "access/commit_ts.h"
80 #include "access/htup_details.h"
81 #include "access/subtrans.h"
82 #include "access/transam.h"
83 #include "access/twophase.h"
84 #include "access/twophase_rmgr.h"
85 #include "access/xact.h"
86 #include "access/xlog.h"
87 #include "access/xloginsert.h"
88 #include "access/xlogreader.h"
89 #include "access/xlogutils.h"
90 #include "catalog/pg_type.h"
91 #include "catalog/storage.h"
93 #include "miscadmin.h"
96 #include "replication/origin.h"
97 #include "replication/syncrep.h"
98 #include "replication/walsender.h"
99 #include "storage/fd.h"
100 #include "storage/ipc.h"
101 #include "storage/md.h"
102 #include "storage/predicate.h"
103 #include "storage/proc.h"
104 #include "storage/procarray.h"
105 #include "storage/sinvaladt.h"
106 #include "storage/smgr.h"
107 #include "utils/builtins.h"
108 #include "utils/memutils.h"
109 #include "utils/timestamp.h"
112 * Directory where Two-phase commit files reside within PGDATA
114 #define TWOPHASE_DIR "pg_twophase"
116 /* GUC variable, can't be changed after startup */
117 int max_prepared_xacts
= 0;
120 * This struct describes one global transaction that is in prepared state
121 * or attempting to become prepared.
123 * The lifecycle of a global transaction is:
125 * 1. After checking that the requested GID is not in use, set up an entry in
126 * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127 * and mark it as locked by my backend.
129 * 2. After successfully completing prepare, set valid = true and enter the
130 * referenced PGPROC into the global ProcArray.
132 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133 * valid and not locked, then mark the entry as locked by storing my current
134 * backend ID into locking_backend. This prevents concurrent attempts to
135 * commit or rollback the same prepared xact.
137 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
141 * Note that if the preparing transaction fails between steps 1 and 2, the
142 * entry must be removed so that the GID and the GlobalTransaction struct
143 * can be reused. See AtAbort_Twophase().
145 * typedef struct GlobalTransactionData *GlobalTransaction appears in
149 typedef struct GlobalTransactionData
151 GlobalTransaction next
; /* list link for free list */
152 int pgprocno
; /* ID of associated dummy PGPROC */
153 BackendId dummyBackendId
; /* similar to backend id for backends */
154 TimestampTz prepared_at
; /* time of preparation */
157 * Note that we need to keep track of two LSNs for each GXACT. We keep
158 * track of the start LSN because this is the address we must use to read
159 * state data back from WAL when committing a prepared GXACT. We keep
160 * track of the end LSN because that is the LSN we need to wait for prior
163 XLogRecPtr prepare_start_lsn
; /* XLOG offset of prepare record start */
164 XLogRecPtr prepare_end_lsn
; /* XLOG offset of prepare record end */
165 TransactionId xid
; /* The GXACT id */
167 Oid owner
; /* ID of user that executed the xact */
168 BackendId locking_backend
; /* backend currently working on the xact */
169 bool valid
; /* true if PGPROC entry is in proc array */
170 bool ondisk
; /* true if prepare state file is on disk */
171 bool inredo
; /* true if entry was added via xlog_redo */
172 char gid
[GIDSIZE
]; /* The GID assigned to the prepared xact */
173 } GlobalTransactionData
;
176 * Two Phase Commit shared state. Access to this struct is protected
177 * by TwoPhaseStateLock.
179 typedef struct TwoPhaseStateData
181 /* Head of linked list of free GlobalTransactionData structs */
182 GlobalTransaction freeGXacts
;
184 /* Number of valid prepXacts entries. */
187 /* There are max_prepared_xacts items in this array */
188 GlobalTransaction prepXacts
[FLEXIBLE_ARRAY_MEMBER
];
191 static TwoPhaseStateData
*TwoPhaseState
;
194 * Global transaction entry currently locked by us, if any. Note that any
195 * access to the entry pointed to by this variable must be protected by
196 * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
197 * (since it's just local memory).
199 static GlobalTransaction MyLockedGxact
= NULL
;
201 static bool twophaseExitRegistered
= false;
203 static void RecordTransactionCommitPrepared(TransactionId xid
,
205 TransactionId
*children
,
207 RelFileLocator
*rels
,
209 xl_xact_stats_item
*stats
,
211 SharedInvalidationMessage
*invalmsgs
,
214 static void RecordTransactionAbortPrepared(TransactionId xid
,
216 TransactionId
*children
,
218 RelFileLocator
*rels
,
220 xl_xact_stats_item
*stats
,
222 static void ProcessRecords(char *bufptr
, TransactionId xid
,
223 const TwoPhaseCallback callbacks
[]);
224 static void RemoveGXact(GlobalTransaction gxact
);
226 static void XlogReadTwoPhaseData(XLogRecPtr lsn
, char **buf
, int *len
);
227 static char *ProcessTwoPhaseBuffer(TransactionId xid
,
228 XLogRecPtr prepare_start_lsn
,
229 bool fromdisk
, bool setParent
, bool setNextXid
);
230 static void MarkAsPreparingGuts(GlobalTransaction gxact
, TransactionId xid
,
231 const char *gid
, TimestampTz prepared_at
, Oid owner
,
233 static void RemoveTwoPhaseFile(TransactionId xid
, bool giveWarning
);
234 static void RecreateTwoPhaseFile(TransactionId xid
, void *content
, int len
);
237 * Initialization of shared memory
240 TwoPhaseShmemSize(void)
244 /* Need the fixed struct, the array of pointers, and the GTD structs */
245 size
= offsetof(TwoPhaseStateData
, prepXacts
);
246 size
= add_size(size
, mul_size(max_prepared_xacts
,
247 sizeof(GlobalTransaction
)));
248 size
= MAXALIGN(size
);
249 size
= add_size(size
, mul_size(max_prepared_xacts
,
250 sizeof(GlobalTransactionData
)));
256 TwoPhaseShmemInit(void)
260 TwoPhaseState
= ShmemInitStruct("Prepared Transaction Table",
263 if (!IsUnderPostmaster
)
265 GlobalTransaction gxacts
;
269 TwoPhaseState
->freeGXacts
= NULL
;
270 TwoPhaseState
->numPrepXacts
= 0;
273 * Initialize the linked list of free GlobalTransactionData structs
275 gxacts
= (GlobalTransaction
)
276 ((char *) TwoPhaseState
+
277 MAXALIGN(offsetof(TwoPhaseStateData
, prepXacts
) +
278 sizeof(GlobalTransaction
) * max_prepared_xacts
));
279 for (i
= 0; i
< max_prepared_xacts
; i
++)
281 /* insert into linked list */
282 gxacts
[i
].next
= TwoPhaseState
->freeGXacts
;
283 TwoPhaseState
->freeGXacts
= &gxacts
[i
];
285 /* associate it with a PGPROC assigned by InitProcGlobal */
286 gxacts
[i
].pgprocno
= PreparedXactProcs
[i
].pgprocno
;
289 * Assign a unique ID for each dummy proc, so that the range of
290 * dummy backend IDs immediately follows the range of normal
291 * backend IDs. We don't dare to assign a real backend ID to dummy
292 * procs, because prepared transactions don't take part in cache
293 * invalidation like a real backend ID would imply, but having a
294 * unique ID for them is nevertheless handy. This arrangement
295 * allows you to allocate an array of size (MaxBackends +
296 * max_prepared_xacts + 1), and have a slot for every backend and
297 * prepared transaction. Currently multixact.c uses that
300 gxacts
[i
].dummyBackendId
= MaxBackends
+ 1 + i
;
308 * Exit hook to unlock the global transaction entry we're working on.
311 AtProcExit_Twophase(int code
, Datum arg
)
313 /* same logic as abort */
318 * Abort hook to unlock the global transaction entry we're working on.
321 AtAbort_Twophase(void)
323 if (MyLockedGxact
== NULL
)
327 * What to do with the locked global transaction entry? If we were in the
328 * process of preparing the transaction, but haven't written the WAL
329 * record and state file yet, the transaction must not be considered as
330 * prepared. Likewise, if we are in the process of finishing an
331 * already-prepared transaction, and fail after having already written the
332 * 2nd phase commit or rollback record to the WAL, the transaction should
333 * not be considered as prepared anymore. In those cases, just remove the
334 * entry from shared memory.
336 * Otherwise, the entry must be left in place so that the transaction can
337 * be finished later, so just unlock it.
339 * If we abort during prepare, after having written the WAL record, we
340 * might not have transferred all locks and other state to the prepared
341 * transaction yet. Likewise, if we abort during commit or rollback,
342 * after having written the WAL record, we might not have released all the
343 * resources held by the transaction yet. In those cases, the in-memory
344 * state can be wrong, but it's too late to back out.
346 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
347 if (!MyLockedGxact
->valid
)
348 RemoveGXact(MyLockedGxact
);
350 MyLockedGxact
->locking_backend
= InvalidBackendId
;
351 LWLockRelease(TwoPhaseStateLock
);
353 MyLockedGxact
= NULL
;
357 * This is called after we have finished transferring state to the prepared
361 PostPrepare_Twophase(void)
363 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
364 MyLockedGxact
->locking_backend
= InvalidBackendId
;
365 LWLockRelease(TwoPhaseStateLock
);
367 MyLockedGxact
= NULL
;
373 * Reserve the GID for the given transaction.
376 MarkAsPreparing(TransactionId xid
, const char *gid
,
377 TimestampTz prepared_at
, Oid owner
, Oid databaseid
)
379 GlobalTransaction gxact
;
382 if (strlen(gid
) >= GIDSIZE
)
384 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
385 errmsg("transaction identifier \"%s\" is too long",
388 /* fail immediately if feature is disabled */
389 if (max_prepared_xacts
== 0)
391 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
392 errmsg("prepared transactions are disabled"),
393 errhint("Set max_prepared_transactions to a nonzero value.")));
395 /* on first call, register the exit hook */
396 if (!twophaseExitRegistered
)
398 before_shmem_exit(AtProcExit_Twophase
, 0);
399 twophaseExitRegistered
= true;
402 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
404 /* Check for conflicting GID */
405 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
407 gxact
= TwoPhaseState
->prepXacts
[i
];
408 if (strcmp(gxact
->gid
, gid
) == 0)
411 (errcode(ERRCODE_DUPLICATE_OBJECT
),
412 errmsg("transaction identifier \"%s\" is already in use",
417 /* Get a free gxact from the freelist */
418 if (TwoPhaseState
->freeGXacts
== NULL
)
420 (errcode(ERRCODE_OUT_OF_MEMORY
),
421 errmsg("maximum number of prepared transactions reached"),
422 errhint("Increase max_prepared_transactions (currently %d).",
423 max_prepared_xacts
)));
424 gxact
= TwoPhaseState
->freeGXacts
;
425 TwoPhaseState
->freeGXacts
= gxact
->next
;
427 MarkAsPreparingGuts(gxact
, xid
, gid
, prepared_at
, owner
, databaseid
);
429 gxact
->ondisk
= false;
431 /* And insert it into the active array */
432 Assert(TwoPhaseState
->numPrepXacts
< max_prepared_xacts
);
433 TwoPhaseState
->prepXacts
[TwoPhaseState
->numPrepXacts
++] = gxact
;
435 LWLockRelease(TwoPhaseStateLock
);
441 * MarkAsPreparingGuts
443 * This uses a gxact struct and puts it into the active array.
444 * NOTE: this is also used when reloading a gxact after a crash; so avoid
445 * assuming that we can use very much backend context.
447 * Note: This function should be called with appropriate locks held.
450 MarkAsPreparingGuts(GlobalTransaction gxact
, TransactionId xid
, const char *gid
,
451 TimestampTz prepared_at
, Oid owner
, Oid databaseid
)
456 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock
, LW_EXCLUSIVE
));
458 Assert(gxact
!= NULL
);
459 proc
= &ProcGlobal
->allProcs
[gxact
->pgprocno
];
461 /* Initialize the PGPROC entry */
462 MemSet(proc
, 0, sizeof(PGPROC
));
463 proc
->pgprocno
= gxact
->pgprocno
;
464 dlist_node_init(&proc
->links
);
465 proc
->waitStatus
= PROC_WAIT_STATUS_OK
;
466 if (LocalTransactionIdIsValid(MyProc
->lxid
))
468 /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
469 proc
->lxid
= MyProc
->lxid
;
470 proc
->backendId
= MyBackendId
;
474 Assert(AmStartupProcess() || !IsPostmasterEnvironment
);
475 /* GetLockConflicts() uses this to specify a wait on the XID */
477 proc
->backendId
= InvalidBackendId
;
480 Assert(proc
->xmin
== InvalidTransactionId
);
481 proc
->delayChkptFlags
= 0;
482 proc
->statusFlags
= 0;
484 proc
->databaseId
= databaseid
;
485 proc
->roleId
= owner
;
486 proc
->tempNamespaceId
= InvalidOid
;
487 proc
->isBackgroundWorker
= false;
488 proc
->lwWaiting
= LW_WS_NOT_WAITING
;
489 proc
->lwWaitMode
= 0;
490 proc
->waitLock
= NULL
;
491 proc
->waitProcLock
= NULL
;
492 pg_atomic_init_u64(&proc
->waitStart
, 0);
493 for (i
= 0; i
< NUM_LOCK_PARTITIONS
; i
++)
494 dlist_init(&proc
->myProcLocks
[i
]);
495 /* subxid data must be filled later by GXactLoadSubxactData */
496 proc
->subxidStatus
.overflowed
= false;
497 proc
->subxidStatus
.count
= 0;
499 gxact
->prepared_at
= prepared_at
;
501 gxact
->owner
= owner
;
502 gxact
->locking_backend
= MyBackendId
;
503 gxact
->valid
= false;
504 gxact
->inredo
= false;
505 strcpy(gxact
->gid
, gid
);
508 * Remember that we have this GlobalTransaction entry locked for us. If we
509 * abort after this, we must release it.
511 MyLockedGxact
= gxact
;
515 * GXactLoadSubxactData
517 * If the transaction being persisted had any subtransactions, this must
518 * be called before MarkAsPrepared() to load information into the dummy
522 GXactLoadSubxactData(GlobalTransaction gxact
, int nsubxacts
,
523 TransactionId
*children
)
525 PGPROC
*proc
= &ProcGlobal
->allProcs
[gxact
->pgprocno
];
527 /* We need no extra lock since the GXACT isn't valid yet */
528 if (nsubxacts
> PGPROC_MAX_CACHED_SUBXIDS
)
530 proc
->subxidStatus
.overflowed
= true;
531 nsubxacts
= PGPROC_MAX_CACHED_SUBXIDS
;
535 memcpy(proc
->subxids
.xids
, children
,
536 nsubxacts
* sizeof(TransactionId
));
537 proc
->subxidStatus
.count
= nsubxacts
;
543 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
545 * lock_held indicates whether caller already holds TwoPhaseStateLock.
548 MarkAsPrepared(GlobalTransaction gxact
, bool lock_held
)
550 /* Lock here may be overkill, but I'm not convinced of that ... */
552 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
553 Assert(!gxact
->valid
);
556 LWLockRelease(TwoPhaseStateLock
);
559 * Put it into the global ProcArray so TransactionIdIsInProgress considers
560 * the XID as still running.
562 ProcArrayAdd(&ProcGlobal
->allProcs
[gxact
->pgprocno
]);
567 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
569 static GlobalTransaction
570 LockGXact(const char *gid
, Oid user
)
574 /* on first call, register the exit hook */
575 if (!twophaseExitRegistered
)
577 before_shmem_exit(AtProcExit_Twophase
, 0);
578 twophaseExitRegistered
= true;
581 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
583 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
585 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
586 PGPROC
*proc
= &ProcGlobal
->allProcs
[gxact
->pgprocno
];
588 /* Ignore not-yet-valid GIDs */
591 if (strcmp(gxact
->gid
, gid
) != 0)
594 /* Found it, but has someone else got it locked? */
595 if (gxact
->locking_backend
!= InvalidBackendId
)
597 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
598 errmsg("prepared transaction with identifier \"%s\" is busy",
601 if (user
!= gxact
->owner
&& !superuser_arg(user
))
603 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
604 errmsg("permission denied to finish prepared transaction"),
605 errhint("Must be superuser or the user that prepared the transaction.")));
608 * Note: it probably would be possible to allow committing from
609 * another database; but at the moment NOTIFY is known not to work and
610 * there may be some other issues as well. Hence disallow until
611 * someone gets motivated to make it work.
613 if (MyDatabaseId
!= proc
->databaseId
)
615 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
616 errmsg("prepared transaction belongs to another database"),
617 errhint("Connect to the database where the transaction was prepared to finish it.")));
619 /* OK for me to lock it */
620 gxact
->locking_backend
= MyBackendId
;
621 MyLockedGxact
= gxact
;
623 LWLockRelease(TwoPhaseStateLock
);
628 LWLockRelease(TwoPhaseStateLock
);
631 (errcode(ERRCODE_UNDEFINED_OBJECT
),
632 errmsg("prepared transaction with identifier \"%s\" does not exist",
641 * Remove the prepared transaction from the shared memory array.
643 * NB: caller should have already removed it from ProcArray
646 RemoveGXact(GlobalTransaction gxact
)
650 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock
, LW_EXCLUSIVE
));
652 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
654 if (gxact
== TwoPhaseState
->prepXacts
[i
])
656 /* remove from the active array */
657 TwoPhaseState
->numPrepXacts
--;
658 TwoPhaseState
->prepXacts
[i
] = TwoPhaseState
->prepXacts
[TwoPhaseState
->numPrepXacts
];
660 /* and put it back in the freelist */
661 gxact
->next
= TwoPhaseState
->freeGXacts
;
662 TwoPhaseState
->freeGXacts
= gxact
;
668 elog(ERROR
, "failed to find %p in GlobalTransaction array", gxact
);
672 * Returns an array of all prepared transactions for the user-level
673 * function pg_prepared_xact.
675 * The returned array and all its elements are copies of internal data
676 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
678 * WARNING -- we return even those transactions that are not fully prepared
679 * yet. The caller should filter them out if he doesn't want them.
681 * The returned array is palloc'd.
684 GetPreparedTransactionList(GlobalTransaction
*gxacts
)
686 GlobalTransaction array
;
690 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
692 if (TwoPhaseState
->numPrepXacts
== 0)
694 LWLockRelease(TwoPhaseStateLock
);
700 num
= TwoPhaseState
->numPrepXacts
;
701 array
= (GlobalTransaction
) palloc(sizeof(GlobalTransactionData
) * num
);
703 for (i
= 0; i
< num
; i
++)
704 memcpy(array
+ i
, TwoPhaseState
->prepXacts
[i
],
705 sizeof(GlobalTransactionData
));
707 LWLockRelease(TwoPhaseStateLock
);
713 /* Working status for pg_prepared_xact */
716 GlobalTransaction array
;
723 * Produce a view with one row per prepared transaction.
725 * This function is here so we don't have to export the
726 * GlobalTransactionData struct definition.
729 pg_prepared_xact(PG_FUNCTION_ARGS
)
731 FuncCallContext
*funcctx
;
732 Working_State
*status
;
734 if (SRF_IS_FIRSTCALL())
737 MemoryContext oldcontext
;
739 /* create a function context for cross-call persistence */
740 funcctx
= SRF_FIRSTCALL_INIT();
743 * Switch to memory context appropriate for multiple function calls
745 oldcontext
= MemoryContextSwitchTo(funcctx
->multi_call_memory_ctx
);
747 /* build tupdesc for result tuples */
748 /* this had better match pg_prepared_xacts view in system_views.sql */
749 tupdesc
= CreateTemplateTupleDesc(5);
750 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "transaction",
752 TupleDescInitEntry(tupdesc
, (AttrNumber
) 2, "gid",
754 TupleDescInitEntry(tupdesc
, (AttrNumber
) 3, "prepared",
755 TIMESTAMPTZOID
, -1, 0);
756 TupleDescInitEntry(tupdesc
, (AttrNumber
) 4, "ownerid",
758 TupleDescInitEntry(tupdesc
, (AttrNumber
) 5, "dbid",
761 funcctx
->tuple_desc
= BlessTupleDesc(tupdesc
);
764 * Collect all the 2PC status information that we will format and send
765 * out as a result set.
767 status
= (Working_State
*) palloc(sizeof(Working_State
));
768 funcctx
->user_fctx
= (void *) status
;
770 status
->ngxacts
= GetPreparedTransactionList(&status
->array
);
773 MemoryContextSwitchTo(oldcontext
);
776 funcctx
= SRF_PERCALL_SETUP();
777 status
= (Working_State
*) funcctx
->user_fctx
;
779 while (status
->array
!= NULL
&& status
->currIdx
< status
->ngxacts
)
781 GlobalTransaction gxact
= &status
->array
[status
->currIdx
++];
782 PGPROC
*proc
= &ProcGlobal
->allProcs
[gxact
->pgprocno
];
783 Datum values
[5] = {0};
792 * Form tuple with appropriate data.
795 values
[0] = TransactionIdGetDatum(proc
->xid
);
796 values
[1] = CStringGetTextDatum(gxact
->gid
);
797 values
[2] = TimestampTzGetDatum(gxact
->prepared_at
);
798 values
[3] = ObjectIdGetDatum(gxact
->owner
);
799 values
[4] = ObjectIdGetDatum(proc
->databaseId
);
801 tuple
= heap_form_tuple(funcctx
->tuple_desc
, values
, nulls
);
802 result
= HeapTupleGetDatum(tuple
);
803 SRF_RETURN_NEXT(funcctx
, result
);
806 SRF_RETURN_DONE(funcctx
);
811 * Get the GlobalTransaction struct for a prepared transaction
814 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
815 * caller had better hold it.
817 static GlobalTransaction
818 TwoPhaseGetGXact(TransactionId xid
, bool lock_held
)
820 GlobalTransaction result
= NULL
;
823 static TransactionId cached_xid
= InvalidTransactionId
;
824 static GlobalTransaction cached_gxact
= NULL
;
826 Assert(!lock_held
|| LWLockHeldByMe(TwoPhaseStateLock
));
829 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
830 * repeatedly for the same XID. We can save work with a simple cache.
832 if (xid
== cached_xid
)
836 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
838 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
840 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
842 if (gxact
->xid
== xid
)
850 LWLockRelease(TwoPhaseStateLock
);
852 if (result
== NULL
) /* should not happen */
853 elog(ERROR
, "failed to find GlobalTransaction for xid %u", xid
);
856 cached_gxact
= result
;
862 * TwoPhaseGetXidByVirtualXID
863 * Lookup VXID among xacts prepared since last startup.
865 * (This won't find recovered xacts.) If more than one matches, return any
866 * and set "have_more" to true. To witness multiple matches, a single
867 * BackendId must consume 2^32 LXIDs, with no intervening database restart.
870 TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid
,
874 TransactionId result
= InvalidTransactionId
;
876 Assert(VirtualTransactionIdIsValid(vxid
));
877 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
879 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
881 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
883 VirtualTransactionId proc_vxid
;
887 proc
= &ProcGlobal
->allProcs
[gxact
->pgprocno
];
888 GET_VXID_FROM_PGPROC(proc_vxid
, *proc
);
889 if (VirtualTransactionIdEquals(vxid
, proc_vxid
))
891 /* Startup process sets proc->backendId to InvalidBackendId. */
892 Assert(!gxact
->inredo
);
894 if (result
!= InvalidTransactionId
)
903 LWLockRelease(TwoPhaseStateLock
);
909 * TwoPhaseGetDummyBackendId
910 * Get the dummy backend ID for prepared transaction specified by XID
912 * Dummy backend IDs are similar to real backend IDs of real backends.
913 * They start at MaxBackends + 1, and are unique across all currently active
914 * real backends and prepared transactions. If lock_held is set to true,
915 * TwoPhaseStateLock will not be taken, so the caller had better hold it.
918 TwoPhaseGetDummyBackendId(TransactionId xid
, bool lock_held
)
920 GlobalTransaction gxact
= TwoPhaseGetGXact(xid
, lock_held
);
922 return gxact
->dummyBackendId
;
926 * TwoPhaseGetDummyProc
927 * Get the PGPROC that represents a prepared transaction specified by XID
929 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
930 * caller had better hold it.
933 TwoPhaseGetDummyProc(TransactionId xid
, bool lock_held
)
935 GlobalTransaction gxact
= TwoPhaseGetGXact(xid
, lock_held
);
937 return &ProcGlobal
->allProcs
[gxact
->pgprocno
];
940 /************************************************************************/
941 /* State file support */
942 /************************************************************************/
944 #define TwoPhaseFilePath(path, xid) \
945 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
948 * 2PC state file format:
950 * 1. TwoPhaseFileHeader
951 * 2. TransactionId[] (subtransactions)
952 * 3. RelFileLocator[] (files to be deleted at commit)
953 * 4. RelFileLocator[] (files to be deleted at abort)
954 * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
955 * 6. TwoPhaseRecordOnDisk
957 * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
958 * 9. checksum (CRC-32C)
960 * Each segment except the final checksum is MAXALIGN'd.
964 * Header for a 2PC state file
966 #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
968 typedef xl_xact_prepare TwoPhaseFileHeader
;
971 * Header for each record in a state file
973 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
974 * The rmgr data will be stored starting on a MAXALIGN boundary.
976 typedef struct TwoPhaseRecordOnDisk
978 uint32 len
; /* length of rmgr data */
979 TwoPhaseRmgrId rmid
; /* resource manager for this record */
980 uint16 info
; /* flag bits for use by rmgr */
981 } TwoPhaseRecordOnDisk
;
984 * During prepare, the state file is assembled in memory before writing it
985 * to WAL and the actual state file. We use a chain of StateFileChunk blocks
988 typedef struct StateFileChunk
992 struct StateFileChunk
*next
;
997 StateFileChunk
*head
; /* first data block in the chain */
998 StateFileChunk
*tail
; /* last block in chain */
1000 uint32 bytes_free
; /* free bytes left in tail block */
1001 uint32 total_len
; /* total data bytes in chain */
1006 * Append a block of data to records data structure.
1008 * NB: each block is padded to a MAXALIGN multiple. This must be
1009 * accounted for when the file is later read!
1011 * The data is copied, so the caller is free to modify it afterwards.
1014 save_state_data(const void *data
, uint32 len
)
1016 uint32 padlen
= MAXALIGN(len
);
1018 if (padlen
> records
.bytes_free
)
1020 records
.tail
->next
= palloc0(sizeof(StateFileChunk
));
1021 records
.tail
= records
.tail
->next
;
1022 records
.tail
->len
= 0;
1023 records
.tail
->next
= NULL
;
1024 records
.num_chunks
++;
1026 records
.bytes_free
= Max(padlen
, 512);
1027 records
.tail
->data
= palloc(records
.bytes_free
);
1030 memcpy(((char *) records
.tail
->data
) + records
.tail
->len
, data
, len
);
1031 records
.tail
->len
+= padlen
;
1032 records
.bytes_free
-= padlen
;
1033 records
.total_len
+= padlen
;
1037 * Start preparing a state file.
1039 * Initializes data structure and inserts the 2PC file header record.
1042 StartPrepare(GlobalTransaction gxact
)
1044 PGPROC
*proc
= &ProcGlobal
->allProcs
[gxact
->pgprocno
];
1045 TransactionId xid
= gxact
->xid
;
1046 TwoPhaseFileHeader hdr
;
1047 TransactionId
*children
;
1048 RelFileLocator
*commitrels
;
1049 RelFileLocator
*abortrels
;
1050 xl_xact_stats_item
*abortstats
= NULL
;
1051 xl_xact_stats_item
*commitstats
= NULL
;
1052 SharedInvalidationMessage
*invalmsgs
;
1054 /* Initialize linked list */
1055 records
.head
= palloc0(sizeof(StateFileChunk
));
1056 records
.head
->len
= 0;
1057 records
.head
->next
= NULL
;
1059 records
.bytes_free
= Max(sizeof(TwoPhaseFileHeader
), 512);
1060 records
.head
->data
= palloc(records
.bytes_free
);
1062 records
.tail
= records
.head
;
1063 records
.num_chunks
= 1;
1065 records
.total_len
= 0;
1068 hdr
.magic
= TWOPHASE_MAGIC
;
1069 hdr
.total_len
= 0; /* EndPrepare will fill this in */
1071 hdr
.database
= proc
->databaseId
;
1072 hdr
.prepared_at
= gxact
->prepared_at
;
1073 hdr
.owner
= gxact
->owner
;
1074 hdr
.nsubxacts
= xactGetCommittedChildren(&children
);
1075 hdr
.ncommitrels
= smgrGetPendingDeletes(true, &commitrels
);
1076 hdr
.nabortrels
= smgrGetPendingDeletes(false, &abortrels
);
1078 pgstat_get_transactional_drops(true, &commitstats
);
1080 pgstat_get_transactional_drops(false, &abortstats
);
1081 hdr
.ninvalmsgs
= xactGetCommittedInvalidationMessages(&invalmsgs
,
1082 &hdr
.initfileinval
);
1083 hdr
.gidlen
= strlen(gxact
->gid
) + 1; /* Include '\0' */
1084 /* EndPrepare will fill the origin data, if necessary */
1085 hdr
.origin_lsn
= InvalidXLogRecPtr
;
1086 hdr
.origin_timestamp
= 0;
1088 save_state_data(&hdr
, sizeof(TwoPhaseFileHeader
));
1089 save_state_data(gxact
->gid
, hdr
.gidlen
);
1092 * Add the additional info about subxacts, deletable files and cache
1093 * invalidation messages.
1095 if (hdr
.nsubxacts
> 0)
1097 save_state_data(children
, hdr
.nsubxacts
* sizeof(TransactionId
));
1098 /* While we have the child-xact data, stuff it in the gxact too */
1099 GXactLoadSubxactData(gxact
, hdr
.nsubxacts
, children
);
1101 if (hdr
.ncommitrels
> 0)
1103 save_state_data(commitrels
, hdr
.ncommitrels
* sizeof(RelFileLocator
));
1106 if (hdr
.nabortrels
> 0)
1108 save_state_data(abortrels
, hdr
.nabortrels
* sizeof(RelFileLocator
));
1111 if (hdr
.ncommitstats
> 0)
1113 save_state_data(commitstats
,
1114 hdr
.ncommitstats
* sizeof(xl_xact_stats_item
));
1117 if (hdr
.nabortstats
> 0)
1119 save_state_data(abortstats
,
1120 hdr
.nabortstats
* sizeof(xl_xact_stats_item
));
1123 if (hdr
.ninvalmsgs
> 0)
1125 save_state_data(invalmsgs
,
1126 hdr
.ninvalmsgs
* sizeof(SharedInvalidationMessage
));
1132 * Finish preparing state data and writing it to WAL.
1135 EndPrepare(GlobalTransaction gxact
)
1137 TwoPhaseFileHeader
*hdr
;
1138 StateFileChunk
*record
;
1141 /* Add the end sentinel to the list of 2PC records */
1142 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID
, 0,
1145 /* Go back and fill in total_len in the file header record */
1146 hdr
= (TwoPhaseFileHeader
*) records
.head
->data
;
1147 Assert(hdr
->magic
== TWOPHASE_MAGIC
);
1148 hdr
->total_len
= records
.total_len
+ sizeof(pg_crc32c
);
1150 replorigin
= (replorigin_session_origin
!= InvalidRepOriginId
&&
1151 replorigin_session_origin
!= DoNotReplicateId
);
1155 hdr
->origin_lsn
= replorigin_session_origin_lsn
;
1156 hdr
->origin_timestamp
= replorigin_session_origin_timestamp
;
1160 * If the data size exceeds MaxAllocSize, we won't be able to read it in
1161 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1162 * where we write data to file and then re-read at commit time.
1164 if (hdr
->total_len
> MaxAllocSize
)
1166 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED
),
1167 errmsg("two-phase state file maximum length exceeded")));
1170 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1171 * cover us, so no need to calculate a separate CRC.
1173 * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1174 * starting immediately after the WAL record is inserted could complete
1175 * without fsync'ing our state file. (This is essentially the same kind
1176 * of race condition as the COMMIT-to-clog-write case that
1177 * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
1179 * We save the PREPARE record's location in the gxact for later use by
1180 * CheckPointTwoPhase.
1182 XLogEnsureRecordSpace(0, records
.num_chunks
);
1184 START_CRIT_SECTION();
1186 Assert((MyProc
->delayChkptFlags
& DELAY_CHKPT_START
) == 0);
1187 MyProc
->delayChkptFlags
|= DELAY_CHKPT_START
;
1190 for (record
= records
.head
; record
!= NULL
; record
= record
->next
)
1191 XLogRegisterData(record
->data
, record
->len
);
1193 XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN
);
1195 gxact
->prepare_end_lsn
= XLogInsert(RM_XACT_ID
, XLOG_XACT_PREPARE
);
1199 /* Move LSNs forward for this replication origin */
1200 replorigin_session_advance(replorigin_session_origin_lsn
,
1201 gxact
->prepare_end_lsn
);
1204 XLogFlush(gxact
->prepare_end_lsn
);
1206 /* If we crash now, we have prepared: WAL replay will fix things */
1208 /* Store record's start location to read that later on Commit */
1209 gxact
->prepare_start_lsn
= ProcLastRecPtr
;
1212 * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1213 * as not running our XID (which it will do immediately after this
1214 * function returns), others can commit/rollback the xact.
1216 * NB: a side effect of this is to make a dummy ProcArray entry for the
1217 * prepared XID. This must happen before we clear the XID from MyProc /
1218 * ProcGlobal->xids[], else there is a window where the XID is not running
1219 * according to TransactionIdIsInProgress, and onlookers would be entitled
1220 * to assume the xact crashed. Instead we have a window where the same
1221 * XID appears twice in ProcArray, which is OK.
1223 MarkAsPrepared(gxact
, false);
1226 * Now we can mark ourselves as out of the commit critical section: a
1227 * checkpoint starting after this will certainly see the gxact as a
1228 * candidate for fsyncing.
1230 MyProc
->delayChkptFlags
&= ~DELAY_CHKPT_START
;
1233 * Remember that we have this GlobalTransaction entry locked for us. If
1234 * we crash after this point, it's too late to abort, but we must unlock
1235 * it so that the prepared transaction can be committed or rolled back.
1237 MyLockedGxact
= gxact
;
1242 * Wait for synchronous replication, if required.
1244 * Note that at this stage we have marked the prepare, but still show as
1245 * running in the procarray (twice!) and continue to hold locks.
1247 SyncRepWaitForLSN(gxact
->prepare_end_lsn
, false);
1249 records
.tail
= records
.head
= NULL
;
1250 records
.num_chunks
= 0;
1254 * Register a 2PC record to be written to state file.
1257 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid
, uint16 info
,
1258 const void *data
, uint32 len
)
1260 TwoPhaseRecordOnDisk record
;
1265 save_state_data(&record
, sizeof(TwoPhaseRecordOnDisk
));
1267 save_state_data(data
, len
);
1272 * Read and validate the state file for xid.
1274 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1275 * contents of the file, issuing an error when finding corrupted data. If
1276 * missing_ok is true, which indicates that missing files can be safely
1277 * ignored, then return NULL. This state can be reached when doing recovery.
1280 ReadTwoPhaseFile(TransactionId xid
, bool missing_ok
)
1282 char path
[MAXPGPATH
];
1284 TwoPhaseFileHeader
*hdr
;
1292 TwoPhaseFilePath(path
, xid
);
1294 fd
= OpenTransientFile(path
, O_RDONLY
| PG_BINARY
);
1297 if (missing_ok
&& errno
== ENOENT
)
1301 (errcode_for_file_access(),
1302 errmsg("could not open file \"%s\": %m", path
)));
1306 * Check file length. We can determine a lower bound pretty easily. We
1307 * set an upper bound to avoid palloc() failure on a corrupt file, though
1308 * we can't guarantee that we won't get an out of memory error anyway,
1309 * even on a valid file.
1311 if (fstat(fd
, &stat
))
1313 (errcode_for_file_access(),
1314 errmsg("could not stat file \"%s\": %m", path
)));
1316 if (stat
.st_size
< (MAXALIGN(sizeof(TwoPhaseFileHeader
)) +
1317 MAXALIGN(sizeof(TwoPhaseRecordOnDisk
)) +
1318 sizeof(pg_crc32c
)) ||
1319 stat
.st_size
> MaxAllocSize
)
1321 (errcode(ERRCODE_DATA_CORRUPTED
),
1322 errmsg_plural("incorrect size of file \"%s\": %lld byte",
1323 "incorrect size of file \"%s\": %lld bytes",
1324 (long long int) stat
.st_size
, path
,
1325 (long long int) stat
.st_size
)));
1327 crc_offset
= stat
.st_size
- sizeof(pg_crc32c
);
1328 if (crc_offset
!= MAXALIGN(crc_offset
))
1330 (errcode(ERRCODE_DATA_CORRUPTED
),
1331 errmsg("incorrect alignment of CRC offset for file \"%s\"",
1335 * OK, slurp in the file.
1337 buf
= (char *) palloc(stat
.st_size
);
1339 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ
);
1340 r
= read(fd
, buf
, stat
.st_size
);
1341 if (r
!= stat
.st_size
)
1345 (errcode_for_file_access(),
1346 errmsg("could not read file \"%s\": %m", path
)));
1349 (errmsg("could not read file \"%s\": read %d of %lld",
1350 path
, r
, (long long int) stat
.st_size
)));
1353 pgstat_report_wait_end();
1355 if (CloseTransientFile(fd
) != 0)
1357 (errcode_for_file_access(),
1358 errmsg("could not close file \"%s\": %m", path
)));
1360 hdr
= (TwoPhaseFileHeader
*) buf
;
1361 if (hdr
->magic
!= TWOPHASE_MAGIC
)
1363 (errcode(ERRCODE_DATA_CORRUPTED
),
1364 errmsg("invalid magic number stored in file \"%s\"",
1367 if (hdr
->total_len
!= stat
.st_size
)
1369 (errcode(ERRCODE_DATA_CORRUPTED
),
1370 errmsg("invalid size stored in file \"%s\"",
1373 INIT_CRC32C(calc_crc
);
1374 COMP_CRC32C(calc_crc
, buf
, crc_offset
);
1375 FIN_CRC32C(calc_crc
);
1377 file_crc
= *((pg_crc32c
*) (buf
+ crc_offset
));
1379 if (!EQ_CRC32C(calc_crc
, file_crc
))
1381 (errcode(ERRCODE_DATA_CORRUPTED
),
1382 errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1390 * Reads 2PC data from xlog. During checkpoint this data will be moved to
1391 * twophase files and ReadTwoPhaseFile should be used instead.
1393 * Note clearly that this function can access WAL during normal operation,
1394 * similarly to the way WALSender or Logical Decoding would do.
1397 XlogReadTwoPhaseData(XLogRecPtr lsn
, char **buf
, int *len
)
1400 XLogReaderState
*xlogreader
;
1403 xlogreader
= XLogReaderAllocate(wal_segment_size
, NULL
,
1404 XL_ROUTINE(.page_read
= &read_local_xlog_page
,
1405 .segment_open
= &wal_segment_open
,
1406 .segment_close
= &wal_segment_close
),
1410 (errcode(ERRCODE_OUT_OF_MEMORY
),
1411 errmsg("out of memory"),
1412 errdetail("Failed while allocating a WAL reading processor.")));
1414 XLogBeginRead(xlogreader
, lsn
);
1415 record
= XLogReadRecord(xlogreader
, &errormsg
);
1421 (errcode_for_file_access(),
1422 errmsg("could not read two-phase state from WAL at %X/%X: %s",
1423 LSN_FORMAT_ARGS(lsn
), errormsg
)));
1426 (errcode_for_file_access(),
1427 errmsg("could not read two-phase state from WAL at %X/%X",
1428 LSN_FORMAT_ARGS(lsn
))));
1431 if (XLogRecGetRmid(xlogreader
) != RM_XACT_ID
||
1432 (XLogRecGetInfo(xlogreader
) & XLOG_XACT_OPMASK
) != XLOG_XACT_PREPARE
)
1434 (errcode_for_file_access(),
1435 errmsg("expected two-phase state data is not present in WAL at %X/%X",
1436 LSN_FORMAT_ARGS(lsn
))));
1439 *len
= XLogRecGetDataLen(xlogreader
);
1441 *buf
= palloc(sizeof(char) * XLogRecGetDataLen(xlogreader
));
1442 memcpy(*buf
, XLogRecGetData(xlogreader
), sizeof(char) * XLogRecGetDataLen(xlogreader
));
1444 XLogReaderFree(xlogreader
);
1449 * Confirms an xid is prepared, during recovery
1452 StandbyTransactionIdIsPrepared(TransactionId xid
)
1455 TwoPhaseFileHeader
*hdr
;
1458 Assert(TransactionIdIsValid(xid
));
1460 if (max_prepared_xacts
<= 0)
1461 return false; /* nothing to do */
1463 /* Read and validate file */
1464 buf
= ReadTwoPhaseFile(xid
, true);
1468 /* Check header also */
1469 hdr
= (TwoPhaseFileHeader
*) buf
;
1470 result
= TransactionIdEquals(hdr
->xid
, xid
);
1477 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1480 FinishPreparedTransaction(const char *gid
, bool isCommit
)
1482 GlobalTransaction gxact
;
1487 TwoPhaseFileHeader
*hdr
;
1488 TransactionId latestXid
;
1489 TransactionId
*children
;
1490 RelFileLocator
*commitrels
;
1491 RelFileLocator
*abortrels
;
1492 RelFileLocator
*delrels
;
1494 xl_xact_stats_item
*commitstats
;
1495 xl_xact_stats_item
*abortstats
;
1496 SharedInvalidationMessage
*invalmsgs
;
1499 * Validate the GID, and lock the GXACT to ensure that two backends do not
1500 * try to commit the same GID at once.
1502 gxact
= LockGXact(gid
, GetUserId());
1503 proc
= &ProcGlobal
->allProcs
[gxact
->pgprocno
];
1507 * Read and validate 2PC state data. State data will typically be stored
1508 * in WAL files if the LSN is after the last checkpoint record, or moved
1509 * to disk if for some reason they have lived for a long time.
1512 buf
= ReadTwoPhaseFile(xid
, false);
1514 XlogReadTwoPhaseData(gxact
->prepare_start_lsn
, &buf
, NULL
);
1518 * Disassemble the header area
1520 hdr
= (TwoPhaseFileHeader
*) buf
;
1521 Assert(TransactionIdEquals(hdr
->xid
, xid
));
1522 bufptr
= buf
+ MAXALIGN(sizeof(TwoPhaseFileHeader
));
1523 bufptr
+= MAXALIGN(hdr
->gidlen
);
1524 children
= (TransactionId
*) bufptr
;
1525 bufptr
+= MAXALIGN(hdr
->nsubxacts
* sizeof(TransactionId
));
1526 commitrels
= (RelFileLocator
*) bufptr
;
1527 bufptr
+= MAXALIGN(hdr
->ncommitrels
* sizeof(RelFileLocator
));
1528 abortrels
= (RelFileLocator
*) bufptr
;
1529 bufptr
+= MAXALIGN(hdr
->nabortrels
* sizeof(RelFileLocator
));
1530 commitstats
= (xl_xact_stats_item
*) bufptr
;
1531 bufptr
+= MAXALIGN(hdr
->ncommitstats
* sizeof(xl_xact_stats_item
));
1532 abortstats
= (xl_xact_stats_item
*) bufptr
;
1533 bufptr
+= MAXALIGN(hdr
->nabortstats
* sizeof(xl_xact_stats_item
));
1534 invalmsgs
= (SharedInvalidationMessage
*) bufptr
;
1535 bufptr
+= MAXALIGN(hdr
->ninvalmsgs
* sizeof(SharedInvalidationMessage
));
1537 /* compute latestXid among all children */
1538 latestXid
= TransactionIdLatest(xid
, hdr
->nsubxacts
, children
);
1540 /* Prevent cancel/die interrupt while cleaning up */
1544 * The order of operations here is critical: make the XLOG entry for
1545 * commit or abort, then mark the transaction committed or aborted in
1546 * pg_xact, then remove its PGPROC from the global ProcArray (which means
1547 * TransactionIdIsInProgress will stop saying the prepared xact is in
1548 * progress), then run the post-commit or post-abort callbacks. The
1549 * callbacks will release the locks the transaction held.
1552 RecordTransactionCommitPrepared(xid
,
1553 hdr
->nsubxacts
, children
,
1554 hdr
->ncommitrels
, commitrels
,
1557 hdr
->ninvalmsgs
, invalmsgs
,
1558 hdr
->initfileinval
, gid
);
1560 RecordTransactionAbortPrepared(xid
,
1561 hdr
->nsubxacts
, children
,
1562 hdr
->nabortrels
, abortrels
,
1567 ProcArrayRemove(proc
, latestXid
);
1570 * In case we fail while running the callbacks, mark the gxact invalid so
1571 * no one else will try to commit/rollback, and so it will be recycled if
1572 * we fail after this point. It is still locked by our backend so it
1573 * won't go away yet.
1575 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1577 gxact
->valid
= false;
1580 * We have to remove any files that were supposed to be dropped. For
1581 * consistency with the regular xact.c code paths, must do this before
1582 * releasing locks, so do it before running the callbacks.
1584 * NB: this code knows that we couldn't be dropping any temp rels ...
1588 delrels
= commitrels
;
1589 ndelrels
= hdr
->ncommitrels
;
1593 delrels
= abortrels
;
1594 ndelrels
= hdr
->nabortrels
;
1597 /* Make sure files supposed to be dropped are dropped */
1598 DropRelationFiles(delrels
, ndelrels
, false);
1601 pgstat_execute_transactional_drops(hdr
->ncommitstats
, commitstats
, false);
1603 pgstat_execute_transactional_drops(hdr
->nabortstats
, abortstats
, false);
1606 * Handle cache invalidation messages.
1608 * Relcache init file invalidation requires processing both before and
1609 * after we send the SI messages, only when committing. See
1614 if (hdr
->initfileinval
)
1615 RelationCacheInitFilePreInvalidate();
1616 SendSharedInvalidMessages(invalmsgs
, hdr
->ninvalmsgs
);
1617 if (hdr
->initfileinval
)
1618 RelationCacheInitFilePostInvalidate();
1622 * Acquire the two-phase lock. We want to work on the two-phase callbacks
1623 * while holding it to avoid potential conflicts with other transactions
1624 * attempting to use the same GID, so the lock is released once the shared
1625 * memory state is cleared.
1627 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
1629 /* And now do the callbacks */
1631 ProcessRecords(bufptr
, xid
, twophase_postcommit_callbacks
);
1633 ProcessRecords(bufptr
, xid
, twophase_postabort_callbacks
);
1635 PredicateLockTwoPhaseFinish(xid
, isCommit
);
1637 /* Clear shared memory state */
1641 * Release the lock as all callbacks are called and shared memory cleanup
1644 LWLockRelease(TwoPhaseStateLock
);
1646 /* Count the prepared xact as committed or aborted */
1647 AtEOXact_PgStat(isCommit
, false);
1650 * And now we can clean up any files we may have left.
1653 RemoveTwoPhaseFile(xid
, true);
1655 MyLockedGxact
= NULL
;
1657 RESUME_INTERRUPTS();
1663 * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1666 ProcessRecords(char *bufptr
, TransactionId xid
,
1667 const TwoPhaseCallback callbacks
[])
1671 TwoPhaseRecordOnDisk
*record
= (TwoPhaseRecordOnDisk
*) bufptr
;
1673 Assert(record
->rmid
<= TWOPHASE_RM_MAX_ID
);
1674 if (record
->rmid
== TWOPHASE_RM_END_ID
)
1677 bufptr
+= MAXALIGN(sizeof(TwoPhaseRecordOnDisk
));
1679 if (callbacks
[record
->rmid
] != NULL
)
1680 callbacks
[record
->rmid
] (xid
, record
->info
,
1681 (void *) bufptr
, record
->len
);
1683 bufptr
+= MAXALIGN(record
->len
);
1688 * Remove the 2PC file for the specified XID.
1690 * If giveWarning is false, do not complain about file-not-present;
1691 * this is an expected case during WAL replay.
1694 RemoveTwoPhaseFile(TransactionId xid
, bool giveWarning
)
1696 char path
[MAXPGPATH
];
1698 TwoPhaseFilePath(path
, xid
);
1700 if (errno
!= ENOENT
|| giveWarning
)
1702 (errcode_for_file_access(),
1703 errmsg("could not remove file \"%s\": %m", path
)));
1707 * Recreates a state file. This is used in WAL replay and during
1708 * checkpoint creation.
1710 * Note: content and len don't include CRC.
1713 RecreateTwoPhaseFile(TransactionId xid
, void *content
, int len
)
1715 char path
[MAXPGPATH
];
1716 pg_crc32c statefile_crc
;
1720 INIT_CRC32C(statefile_crc
);
1721 COMP_CRC32C(statefile_crc
, content
, len
);
1722 FIN_CRC32C(statefile_crc
);
1724 TwoPhaseFilePath(path
, xid
);
1726 fd
= OpenTransientFile(path
,
1727 O_CREAT
| O_TRUNC
| O_WRONLY
| PG_BINARY
);
1730 (errcode_for_file_access(),
1731 errmsg("could not recreate file \"%s\": %m", path
)));
1733 /* Write content and CRC */
1735 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE
);
1736 if (write(fd
, content
, len
) != len
)
1738 /* if write didn't set errno, assume problem is no disk space */
1742 (errcode_for_file_access(),
1743 errmsg("could not write file \"%s\": %m", path
)));
1745 if (write(fd
, &statefile_crc
, sizeof(pg_crc32c
)) != sizeof(pg_crc32c
))
1747 /* if write didn't set errno, assume problem is no disk space */
1751 (errcode_for_file_access(),
1752 errmsg("could not write file \"%s\": %m", path
)));
1754 pgstat_report_wait_end();
1757 * We must fsync the file because the end-of-replay checkpoint will not do
1758 * so, there being no GXACT in shared memory yet to tell it to.
1760 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC
);
1761 if (pg_fsync(fd
) != 0)
1763 (errcode_for_file_access(),
1764 errmsg("could not fsync file \"%s\": %m", path
)));
1765 pgstat_report_wait_end();
1767 if (CloseTransientFile(fd
) != 0)
1769 (errcode_for_file_access(),
1770 errmsg("could not close file \"%s\": %m", path
)));
1774 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1776 * We must fsync the state file of any GXACT that is valid or has been
1777 * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1778 * horizon. (If the gxact isn't valid yet, has not been generated in
1779 * redo, or has a later LSN, this checkpoint is not responsible for
1782 * This is deliberately run as late as possible in the checkpoint sequence,
1783 * because GXACTs ordinarily have short lifespans, and so it is quite
1784 * possible that GXACTs that were valid at checkpoint start will no longer
1785 * exist if we wait a little bit. With typical checkpoint settings this
1786 * will be about 3 minutes for an online checkpoint, so as a result we
1787 * expect that there will be no GXACTs that need to be copied to disk.
1789 * If a GXACT remains valid across multiple checkpoints, it will already
1790 * be on disk so we don't bother to repeat that write.
1793 CheckPointTwoPhase(XLogRecPtr redo_horizon
)
1796 int serialized_xacts
= 0;
1798 if (max_prepared_xacts
<= 0)
1799 return; /* nothing to do */
1801 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1804 * We are expecting there to be zero GXACTs that need to be copied to
1805 * disk, so we perform all I/O while holding TwoPhaseStateLock for
1806 * simplicity. This prevents any new xacts from preparing while this
1807 * occurs, which shouldn't be a problem since the presence of long-lived
1808 * prepared xacts indicates the transaction manager isn't active.
1810 * It's also possible to move I/O out of the lock, but on every error we
1811 * should check whether somebody committed our transaction in different
1812 * backend. Let's leave this optimization for future, if somebody will
1813 * spot that this place cause bottleneck.
1815 * Note that it isn't possible for there to be a GXACT with a
1816 * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1817 * because of the efforts with delayChkptFlags.
1819 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
1820 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
1823 * Note that we are using gxact not PGPROC so this works in recovery
1826 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
1828 if ((gxact
->valid
|| gxact
->inredo
) &&
1830 gxact
->prepare_end_lsn
<= redo_horizon
)
1835 XlogReadTwoPhaseData(gxact
->prepare_start_lsn
, &buf
, &len
);
1836 RecreateTwoPhaseFile(gxact
->xid
, buf
, len
);
1837 gxact
->ondisk
= true;
1838 gxact
->prepare_start_lsn
= InvalidXLogRecPtr
;
1839 gxact
->prepare_end_lsn
= InvalidXLogRecPtr
;
1844 LWLockRelease(TwoPhaseStateLock
);
1847 * Flush unconditionally the parent directory to make any information
1848 * durable on disk. Two-phase files could have been removed and those
1849 * removals need to be made persistent as well as any files newly created
1850 * previously since the last checkpoint.
1852 fsync_fname(TWOPHASE_DIR
, true);
1854 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1856 if (log_checkpoints
&& serialized_xacts
> 0)
1858 (errmsg_plural("%u two-phase state file was written "
1859 "for a long-running prepared transaction",
1860 "%u two-phase state files were written "
1861 "for long-running prepared transactions",
1863 serialized_xacts
)));
1867 * restoreTwoPhaseData
1869 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1870 * This is called once at the beginning of recovery, saving any extra
1871 * lookups in the future. Two-phase files that are newer than the
1872 * minimum XID horizon are discarded on the way.
1875 restoreTwoPhaseData(void)
1878 struct dirent
*clde
;
1880 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
1881 cldir
= AllocateDir(TWOPHASE_DIR
);
1882 while ((clde
= ReadDir(cldir
, TWOPHASE_DIR
)) != NULL
)
1884 if (strlen(clde
->d_name
) == 8 &&
1885 strspn(clde
->d_name
, "0123456789ABCDEF") == 8)
1890 xid
= (TransactionId
) strtoul(clde
->d_name
, NULL
, 16);
1892 buf
= ProcessTwoPhaseBuffer(xid
, InvalidXLogRecPtr
,
1893 true, false, false);
1897 PrepareRedoAdd(buf
, InvalidXLogRecPtr
,
1898 InvalidXLogRecPtr
, InvalidRepOriginId
);
1901 LWLockRelease(TwoPhaseStateLock
);
1906 * PrescanPreparedTransactions
1908 * Scan the shared memory entries of TwoPhaseState and determine the range
1909 * of valid XIDs present. This is run during database startup, after we
1910 * have completed reading WAL. ShmemVariableCache->nextXid has been set to
1911 * one more than the highest XID for which evidence exists in WAL.
1913 * We throw away any prepared xacts with main XID beyond nextXid --- if any
1914 * are present, it suggests that the DBA has done a PITR recovery to an
1915 * earlier point in time without cleaning out pg_twophase. We dare not
1916 * try to recover such prepared xacts since they likely depend on database
1917 * state that doesn't exist now.
1919 * However, we will advance nextXid beyond any subxact XIDs belonging to
1920 * valid prepared xacts. We need to do this since subxact commit doesn't
1921 * write a WAL entry, and so there might be no evidence in WAL of those
1924 * On corrupted two-phase files, fail immediately. Keeping around broken
1925 * entries and let replay continue causes harm on the system, and a new
1926 * backup should be rolled in.
1928 * Our other responsibility is to determine and return the oldest valid XID
1929 * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1930 * This is needed to synchronize pg_subtrans startup properly.
1932 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1933 * top-level xids is stored in *xids_p. The number of entries in the array
1934 * is returned in *nxids_p.
1937 PrescanPreparedTransactions(TransactionId
**xids_p
, int *nxids_p
)
1939 FullTransactionId nextXid
= ShmemVariableCache
->nextXid
;
1940 TransactionId origNextXid
= XidFromFullTransactionId(nextXid
);
1941 TransactionId result
= origNextXid
;
1942 TransactionId
*xids
= NULL
;
1947 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
1948 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
1952 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
1954 Assert(gxact
->inredo
);
1958 buf
= ProcessTwoPhaseBuffer(xid
,
1959 gxact
->prepare_start_lsn
,
1960 gxact
->ondisk
, false, true);
1966 * OK, we think this file is valid. Incorporate xid into the
1967 * running-minimum result.
1969 if (TransactionIdPrecedes(xid
, result
))
1974 if (nxids
== allocsize
)
1979 xids
= palloc(allocsize
* sizeof(TransactionId
));
1983 allocsize
= allocsize
* 2;
1984 xids
= repalloc(xids
, allocsize
* sizeof(TransactionId
));
1987 xids
[nxids
++] = xid
;
1992 LWLockRelease(TwoPhaseStateLock
);
2004 * StandbyRecoverPreparedTransactions
2006 * Scan the shared memory entries of TwoPhaseState and setup all the required
2007 * information to allow standby queries to treat prepared transactions as still
2010 * This is never called at the end of recovery - we use
2011 * RecoverPreparedTransactions() at that point.
2013 * The lack of calls to SubTransSetParent() calls here is by design;
2014 * those calls are made by RecoverPreparedTransactions() at the end of recovery
2015 * for those xacts that need this.
2018 StandbyRecoverPreparedTransactions(void)
2022 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
2023 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
2027 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
2029 Assert(gxact
->inredo
);
2033 buf
= ProcessTwoPhaseBuffer(xid
,
2034 gxact
->prepare_start_lsn
,
2035 gxact
->ondisk
, false, false);
2039 LWLockRelease(TwoPhaseStateLock
);
2043 * RecoverPreparedTransactions
2045 * Scan the shared memory entries of TwoPhaseState and reload the state for
2046 * each prepared transaction (reacquire locks, etc).
2048 * This is run at the end of recovery, but before we allow backends to write
2051 * At the end of recovery the way we take snapshots will change. We now need
2052 * to mark all running transactions with their full SubTransSetParent() info
2053 * to allow normal snapshots to work correctly if snapshots overflow.
2054 * We do this here because by definition prepared transactions are the only
2055 * type of write transaction still running, so this is necessary and
2059 RecoverPreparedTransactions(void)
2063 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
2064 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
2068 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
2070 TwoPhaseFileHeader
*hdr
;
2071 TransactionId
*subxids
;
2077 * Reconstruct subtrans state for the transaction --- needed because
2078 * pg_subtrans is not preserved over a restart. Note that we are
2079 * linking all the subtransactions directly to the top-level XID;
2080 * there may originally have been a more complex hierarchy, but
2081 * there's no need to restore that exactly. It's possible that
2082 * SubTransSetParent has been set before, if the prepared transaction
2083 * generated xid assignment records.
2085 buf
= ProcessTwoPhaseBuffer(xid
,
2086 gxact
->prepare_start_lsn
,
2087 gxact
->ondisk
, true, false);
2092 (errmsg("recovering prepared transaction %u from shared memory", xid
)));
2094 hdr
= (TwoPhaseFileHeader
*) buf
;
2095 Assert(TransactionIdEquals(hdr
->xid
, xid
));
2096 bufptr
= buf
+ MAXALIGN(sizeof(TwoPhaseFileHeader
));
2097 gid
= (const char *) bufptr
;
2098 bufptr
+= MAXALIGN(hdr
->gidlen
);
2099 subxids
= (TransactionId
*) bufptr
;
2100 bufptr
+= MAXALIGN(hdr
->nsubxacts
* sizeof(TransactionId
));
2101 bufptr
+= MAXALIGN(hdr
->ncommitrels
* sizeof(RelFileLocator
));
2102 bufptr
+= MAXALIGN(hdr
->nabortrels
* sizeof(RelFileLocator
));
2103 bufptr
+= MAXALIGN(hdr
->ncommitstats
* sizeof(xl_xact_stats_item
));
2104 bufptr
+= MAXALIGN(hdr
->nabortstats
* sizeof(xl_xact_stats_item
));
2105 bufptr
+= MAXALIGN(hdr
->ninvalmsgs
* sizeof(SharedInvalidationMessage
));
2108 * Recreate its GXACT and dummy PGPROC. But, check whether it was
2109 * added in redo and already has a shmem entry for it.
2111 MarkAsPreparingGuts(gxact
, xid
, gid
,
2113 hdr
->owner
, hdr
->database
);
2115 /* recovered, so reset the flag for entries generated by redo */
2116 gxact
->inredo
= false;
2118 GXactLoadSubxactData(gxact
, hdr
->nsubxacts
, subxids
);
2119 MarkAsPrepared(gxact
, true);
2121 LWLockRelease(TwoPhaseStateLock
);
2124 * Recover other state (notably locks) using resource managers.
2126 ProcessRecords(bufptr
, xid
, twophase_recover_callbacks
);
2129 * Release locks held by the standby process after we process each
2130 * prepared transaction. As a result, we don't need too many
2131 * additional locks at any one time.
2134 StandbyReleaseLockTree(xid
, hdr
->nsubxacts
, subxids
);
2137 * We're done with recovering this transaction. Clear MyLockedGxact,
2138 * like we do in PrepareTransaction() during normal operation.
2140 PostPrepare_Twophase();
2144 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
2147 LWLockRelease(TwoPhaseStateLock
);
2151 * ProcessTwoPhaseBuffer
2153 * Given a transaction id, read it either from disk or read it directly
2154 * via shmem xlog record pointer using the provided "prepare_start_lsn".
2156 * If setParent is true, set up subtransaction parent linkages.
2158 * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
2162 ProcessTwoPhaseBuffer(TransactionId xid
,
2163 XLogRecPtr prepare_start_lsn
,
2165 bool setParent
, bool setNextXid
)
2167 FullTransactionId nextXid
= ShmemVariableCache
->nextXid
;
2168 TransactionId origNextXid
= XidFromFullTransactionId(nextXid
);
2169 TransactionId
*subxids
;
2171 TwoPhaseFileHeader
*hdr
;
2174 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock
, LW_EXCLUSIVE
));
2177 Assert(prepare_start_lsn
!= InvalidXLogRecPtr
);
2179 /* Already processed? */
2180 if (TransactionIdDidCommit(xid
) || TransactionIdDidAbort(xid
))
2185 (errmsg("removing stale two-phase state file for transaction %u",
2187 RemoveTwoPhaseFile(xid
, true);
2192 (errmsg("removing stale two-phase state from memory for transaction %u",
2194 PrepareRedoRemove(xid
, true);
2199 /* Reject XID if too new */
2200 if (TransactionIdFollowsOrEquals(xid
, origNextXid
))
2205 (errmsg("removing future two-phase state file for transaction %u",
2207 RemoveTwoPhaseFile(xid
, true);
2212 (errmsg("removing future two-phase state from memory for transaction %u",
2214 PrepareRedoRemove(xid
, true);
2221 /* Read and validate file */
2222 buf
= ReadTwoPhaseFile(xid
, false);
2226 /* Read xlog data */
2227 XlogReadTwoPhaseData(prepare_start_lsn
, &buf
, NULL
);
2230 /* Deconstruct header */
2231 hdr
= (TwoPhaseFileHeader
*) buf
;
2232 if (!TransactionIdEquals(hdr
->xid
, xid
))
2236 (errcode(ERRCODE_DATA_CORRUPTED
),
2237 errmsg("corrupted two-phase state file for transaction %u",
2241 (errcode(ERRCODE_DATA_CORRUPTED
),
2242 errmsg("corrupted two-phase state in memory for transaction %u",
2247 * Examine subtransaction XIDs ... they should all follow main XID, and
2248 * they may force us to advance nextXid.
2250 subxids
= (TransactionId
*) (buf
+
2251 MAXALIGN(sizeof(TwoPhaseFileHeader
)) +
2252 MAXALIGN(hdr
->gidlen
));
2253 for (i
= 0; i
< hdr
->nsubxacts
; i
++)
2255 TransactionId subxid
= subxids
[i
];
2257 Assert(TransactionIdFollows(subxid
, xid
));
2259 /* update nextXid if needed */
2261 AdvanceNextFullTransactionIdPastXid(subxid
);
2264 SubTransSetParent(subxid
, xid
);
2272 * RecordTransactionCommitPrepared
2274 * This is basically the same as RecordTransactionCommit (q.v. if you change
2275 * this function): in particular, we must set DELAY_CHKPT_START to avoid a
2278 * We know the transaction made at least one XLOG entry (its PREPARE),
2279 * so it is never possible to optimize out the commit record.
2282 RecordTransactionCommitPrepared(TransactionId xid
,
2284 TransactionId
*children
,
2286 RelFileLocator
*rels
,
2288 xl_xact_stats_item
*stats
,
2290 SharedInvalidationMessage
*invalmsgs
,
2295 TimestampTz committs
= GetCurrentTimestamp();
2299 * Are we using the replication origins feature? Or, in other words, are
2300 * we replaying remote actions?
2302 replorigin
= (replorigin_session_origin
!= InvalidRepOriginId
&&
2303 replorigin_session_origin
!= DoNotReplicateId
);
2305 START_CRIT_SECTION();
2307 /* See notes in RecordTransactionCommit */
2308 Assert((MyProc
->delayChkptFlags
& DELAY_CHKPT_START
) == 0);
2309 MyProc
->delayChkptFlags
|= DELAY_CHKPT_START
;
2312 * Emit the XLOG commit record. Note that we mark 2PC commits as
2313 * potentially having AccessExclusiveLocks since we don't know whether or
2316 recptr
= XactLogCommitRecord(committs
,
2317 nchildren
, children
, nrels
, rels
,
2319 ninvalmsgs
, invalmsgs
,
2321 MyXactFlags
| XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK
,
2326 /* Move LSNs forward for this replication origin */
2327 replorigin_session_advance(replorigin_session_origin_lsn
,
2331 * Record commit timestamp. The value comes from plain commit timestamp
2332 * if replorigin is not enabled, or replorigin already set a value for us
2333 * in replorigin_session_origin_timestamp otherwise.
2335 * We don't need to WAL-log anything here, as the commit record written
2336 * above already contains the data.
2338 if (!replorigin
|| replorigin_session_origin_timestamp
== 0)
2339 replorigin_session_origin_timestamp
= committs
;
2341 TransactionTreeSetCommitTsData(xid
, nchildren
, children
,
2342 replorigin_session_origin_timestamp
,
2343 replorigin_session_origin
);
2346 * We don't currently try to sleep before flush here ... nor is there any
2347 * support for async commit of a prepared xact (the very idea is probably
2351 /* Flush XLOG to disk */
2354 /* Mark the transaction committed in pg_xact */
2355 TransactionIdCommitTree(xid
, nchildren
, children
);
2357 /* Checkpoint can proceed now */
2358 MyProc
->delayChkptFlags
&= ~DELAY_CHKPT_START
;
2363 * Wait for synchronous replication, if required.
2365 * Note that at this stage we have marked clog, but still show as running
2366 * in the procarray and continue to hold locks.
2368 SyncRepWaitForLSN(recptr
, true);
2372 * RecordTransactionAbortPrepared
2374 * This is basically the same as RecordTransactionAbort.
2376 * We know the transaction made at least one XLOG entry (its PREPARE),
2377 * so it is never possible to optimize out the abort record.
2380 RecordTransactionAbortPrepared(TransactionId xid
,
2382 TransactionId
*children
,
2384 RelFileLocator
*rels
,
2386 xl_xact_stats_item
*stats
,
2393 * Are we using the replication origins feature? Or, in other words, are
2394 * we replaying remote actions?
2396 replorigin
= (replorigin_session_origin
!= InvalidRepOriginId
&&
2397 replorigin_session_origin
!= DoNotReplicateId
);
2400 * Catch the scenario where we aborted partway through
2401 * RecordTransactionCommitPrepared ...
2403 if (TransactionIdDidCommit(xid
))
2404 elog(PANIC
, "cannot abort transaction %u, it was already committed",
2407 START_CRIT_SECTION();
2410 * Emit the XLOG commit record. Note that we mark 2PC aborts as
2411 * potentially having AccessExclusiveLocks since we don't know whether or
2414 recptr
= XactLogAbortRecord(GetCurrentTimestamp(),
2415 nchildren
, children
,
2418 MyXactFlags
| XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK
,
2422 /* Move LSNs forward for this replication origin */
2423 replorigin_session_advance(replorigin_session_origin_lsn
,
2426 /* Always flush, since we're about to remove the 2PC state file */
2430 * Mark the transaction aborted in clog. This is not absolutely necessary
2431 * but we may as well do it while we are here.
2433 TransactionIdAbortTree(xid
, nchildren
, children
);
2438 * Wait for synchronous replication, if required.
2440 * Note that at this stage we have marked clog, but still show as running
2441 * in the procarray and continue to hold locks.
2443 SyncRepWaitForLSN(recptr
, false);
2449 * Store pointers to the start/end of the WAL record along with the xid in
2450 * a gxact entry in shared memory TwoPhaseState structure. If caller
2451 * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2452 * data, the entry is marked as located on disk.
2455 PrepareRedoAdd(char *buf
, XLogRecPtr start_lsn
,
2456 XLogRecPtr end_lsn
, RepOriginId origin_id
)
2458 TwoPhaseFileHeader
*hdr
= (TwoPhaseFileHeader
*) buf
;
2461 GlobalTransaction gxact
;
2463 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock
, LW_EXCLUSIVE
));
2464 Assert(RecoveryInProgress());
2466 bufptr
= buf
+ MAXALIGN(sizeof(TwoPhaseFileHeader
));
2467 gid
= (const char *) bufptr
;
2470 * Reserve the GID for the given transaction in the redo code path.
2472 * This creates a gxact struct and puts it into the active array.
2474 * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2475 * shared memory. Hence, we only fill up the bare minimum contents here.
2476 * The gxact also gets marked with gxact->inredo set to true to indicate
2477 * that it got added in the redo phase
2480 /* Get a free gxact from the freelist */
2481 if (TwoPhaseState
->freeGXacts
== NULL
)
2483 (errcode(ERRCODE_OUT_OF_MEMORY
),
2484 errmsg("maximum number of prepared transactions reached"),
2485 errhint("Increase max_prepared_transactions (currently %d).",
2486 max_prepared_xacts
)));
2487 gxact
= TwoPhaseState
->freeGXacts
;
2488 TwoPhaseState
->freeGXacts
= gxact
->next
;
2490 gxact
->prepared_at
= hdr
->prepared_at
;
2491 gxact
->prepare_start_lsn
= start_lsn
;
2492 gxact
->prepare_end_lsn
= end_lsn
;
2493 gxact
->xid
= hdr
->xid
;
2494 gxact
->owner
= hdr
->owner
;
2495 gxact
->locking_backend
= InvalidBackendId
;
2496 gxact
->valid
= false;
2497 gxact
->ondisk
= XLogRecPtrIsInvalid(start_lsn
);
2498 gxact
->inredo
= true; /* yes, added in redo */
2499 strcpy(gxact
->gid
, gid
);
2501 /* And insert it into the active array */
2502 Assert(TwoPhaseState
->numPrepXacts
< max_prepared_xacts
);
2503 TwoPhaseState
->prepXacts
[TwoPhaseState
->numPrepXacts
++] = gxact
;
2505 if (origin_id
!= InvalidRepOriginId
)
2507 /* recover apply progress */
2508 replorigin_advance(origin_id
, hdr
->origin_lsn
, end_lsn
,
2509 false /* backward */ , false /* WAL */ );
2512 elog(DEBUG2
, "added 2PC data in shared memory for transaction %u", gxact
->xid
);
2518 * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2519 * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2521 * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2525 PrepareRedoRemove(TransactionId xid
, bool giveWarning
)
2527 GlobalTransaction gxact
= NULL
;
2531 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock
, LW_EXCLUSIVE
));
2532 Assert(RecoveryInProgress());
2534 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
2536 gxact
= TwoPhaseState
->prepXacts
[i
];
2538 if (gxact
->xid
== xid
)
2540 Assert(gxact
->inredo
);
2547 * Just leave if there is nothing, this is expected during WAL replay.
2553 * And now we can clean up any files we may have left.
2555 elog(DEBUG2
, "removing 2PC data for transaction %u", xid
);
2557 RemoveTwoPhaseFile(xid
, giveWarning
);
2563 * Check if the prepared transaction with the given GID, lsn and timestamp
2566 * Note that we always compare with the LSN where prepare ends because that is
2567 * what is stored as origin_lsn in the 2PC file.
2569 * This function is primarily used to check if the prepared transaction
2570 * received from the upstream (remote node) already exists. Checking only GID
2571 * is not sufficient because a different prepared xact with the same GID can
2572 * exist on the same node. So, we are ensuring to match origin_lsn and
2573 * origin_timestamp of prepared xact to avoid the possibility of a match of
2574 * prepared xact from two different nodes.
2577 LookupGXact(const char *gid
, XLogRecPtr prepare_end_lsn
,
2578 TimestampTz origin_prepare_timestamp
)
2583 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
2584 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
2586 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
2588 /* Ignore not-yet-valid GIDs. */
2589 if (gxact
->valid
&& strcmp(gxact
->gid
, gid
) == 0)
2592 TwoPhaseFileHeader
*hdr
;
2595 * We are not expecting collisions of GXACTs (same gid) between
2596 * publisher and subscribers, so we perform all I/O while holding
2597 * TwoPhaseStateLock for simplicity.
2599 * To move the I/O out of the lock, we need to ensure that no
2600 * other backend commits the prepared xact in the meantime. We can
2601 * do this optimization if we encounter many collisions in GID
2602 * between publisher and subscriber.
2605 buf
= ReadTwoPhaseFile(gxact
->xid
, false);
2608 Assert(gxact
->prepare_start_lsn
);
2609 XlogReadTwoPhaseData(gxact
->prepare_start_lsn
, &buf
, NULL
);
2612 hdr
= (TwoPhaseFileHeader
*) buf
;
2614 if (hdr
->origin_lsn
== prepare_end_lsn
&&
2615 hdr
->origin_timestamp
== origin_prepare_timestamp
)
2625 LWLockRelease(TwoPhaseStateLock
);