1 /*-------------------------------------------------------------------------
4 * Logical replication progress tracking support.
6 * Copyright (c) 2013-2022, PostgreSQL Global Development Group
9 * src/backend/replication/logical/origin.c
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
18 * Replication origin consist out of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
45 * There are several levels of locking at work:
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
51 * * When creating an in-memory replication progress slot the ReplicationOrigin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (cf. AdvanceReplicationProgress).
65 * ---------------------------------------------------------------------------
73 #include "access/genam.h"
74 #include "access/htup_details.h"
75 #include "access/table.h"
76 #include "access/xact.h"
77 #include "catalog/catalog.h"
78 #include "catalog/indexing.h"
80 #include "miscadmin.h"
81 #include "nodes/execnodes.h"
83 #include "replication/logical.h"
84 #include "replication/origin.h"
85 #include "storage/condition_variable.h"
86 #include "storage/copydir.h"
87 #include "storage/fd.h"
88 #include "storage/ipc.h"
89 #include "storage/lmgr.h"
90 #include "utils/builtins.h"
91 #include "utils/fmgroids.h"
92 #include "utils/pg_lsn.h"
93 #include "utils/rel.h"
94 #include "utils/snapmgr.h"
95 #include "utils/syscache.h"
98 * Replay progress of a single remote node.
100 typedef struct ReplicationState
103 * Local identifier for the remote node.
108 * Location of the latest commit from the remote side.
110 XLogRecPtr remote_lsn
;
113 * Remember the local lsn of the commit record so we can XLogFlush() to it
114 * during a checkpoint so we know the commit record actually is safe on
117 XLogRecPtr local_lsn
;
120 * PID of backend that's acquired slot, or 0 if none.
125 * Condition variable that's signaled when acquired_by changes.
127 ConditionVariable origin_cv
;
130 * Lock protecting remote_lsn and local_lsn.
136 * On disk version of ReplicationState.
138 typedef struct ReplicationStateOnDisk
141 XLogRecPtr remote_lsn
;
142 } ReplicationStateOnDisk
;
145 typedef struct ReplicationStateCtl
147 /* Tranche to use for per-origin LWLocks */
149 /* Array of length max_replication_slots */
150 ReplicationState states
[FLEXIBLE_ARRAY_MEMBER
];
151 } ReplicationStateCtl
;
153 /* external variables */
154 RepOriginId replorigin_session_origin
= InvalidRepOriginId
; /* assumed identity */
155 XLogRecPtr replorigin_session_origin_lsn
= InvalidXLogRecPtr
;
156 TimestampTz replorigin_session_origin_timestamp
= 0;
159 * Base address into a shared memory array of replication states of size
160 * max_replication_slots.
162 * XXX: Should we use a separate variable to size this rather than
163 * max_replication_slots?
165 static ReplicationState
*replication_states
;
168 * Actual shared memory block (replication_states[] is now part of this).
170 static ReplicationStateCtl
*replication_states_ctl
;
173 * Backend-local, cached element from ReplicationState for use in a backend
174 * replaying remote commits, so we don't have to search ReplicationState for
175 * the backends current RepOriginId.
177 static ReplicationState
*session_replication_state
= NULL
;
179 /* Magic for on disk files. */
180 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
183 replorigin_check_prerequisites(bool check_slots
, bool recoveryOK
)
185 if (check_slots
&& max_replication_slots
== 0)
187 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
188 errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
190 if (!recoveryOK
&& RecoveryInProgress())
192 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION
),
193 errmsg("cannot manipulate replication origins during recovery")));
198 /* ---------------------------------------------------------------------------
199 * Functions for working with replication origins themselves.
200 * ---------------------------------------------------------------------------
204 * Check for a persistent replication origin identified by name.
206 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
209 replorigin_by_name(const char *roname
, bool missing_ok
)
211 Form_pg_replication_origin ident
;
212 Oid roident
= InvalidOid
;
216 roname_d
= CStringGetTextDatum(roname
);
218 tuple
= SearchSysCache1(REPLORIGNAME
, roname_d
);
219 if (HeapTupleIsValid(tuple
))
221 ident
= (Form_pg_replication_origin
) GETSTRUCT(tuple
);
222 roident
= ident
->roident
;
223 ReleaseSysCache(tuple
);
225 else if (!missing_ok
)
227 (errcode(ERRCODE_UNDEFINED_OBJECT
),
228 errmsg("replication origin \"%s\" does not exist",
235 * Create a replication origin.
237 * Needs to be called in a transaction.
240 replorigin_create(const char *roname
)
243 HeapTuple tuple
= NULL
;
246 SnapshotData SnapshotDirty
;
250 roname_d
= CStringGetTextDatum(roname
);
252 Assert(IsTransactionState());
255 * We need the numeric replication origin to be 16bit wide, so we cannot
256 * rely on the normal oid allocation. Instead we simply scan
257 * pg_replication_origin for the first unused id. That's not particularly
258 * efficient, but this should be a fairly infrequent operation - we can
259 * easily spend a bit more code on this when it turns out it needs to be
262 * We handle concurrency by taking an exclusive lock (allowing reads!)
263 * over the table for the duration of the search. Because we use a "dirty
264 * snapshot" we can read rows that other in-progress sessions have
265 * written, even though they would be invisible with normal snapshots. Due
266 * to the exclusive lock there's no danger that new rows can appear while
269 InitDirtySnapshot(SnapshotDirty
);
271 rel
= table_open(ReplicationOriginRelationId
, ExclusiveLock
);
273 for (roident
= InvalidOid
+ 1; roident
< PG_UINT16_MAX
; roident
++)
275 bool nulls
[Natts_pg_replication_origin
];
276 Datum values
[Natts_pg_replication_origin
];
279 CHECK_FOR_INTERRUPTS();
282 Anum_pg_replication_origin_roident
,
283 BTEqualStrategyNumber
, F_OIDEQ
,
284 ObjectIdGetDatum(roident
));
286 scan
= systable_beginscan(rel
, ReplicationOriginIdentIndex
,
291 collides
= HeapTupleIsValid(systable_getnext(scan
));
293 systable_endscan(scan
);
298 * Ok, found an unused roident, insert the new row and do a CCI,
299 * so our callers can look it up if they want to.
301 memset(&nulls
, 0, sizeof(nulls
));
303 values
[Anum_pg_replication_origin_roident
- 1] = ObjectIdGetDatum(roident
);
304 values
[Anum_pg_replication_origin_roname
- 1] = roname_d
;
306 tuple
= heap_form_tuple(RelationGetDescr(rel
), values
, nulls
);
307 CatalogTupleInsert(rel
, tuple
);
308 CommandCounterIncrement();
313 /* now release lock again, */
314 table_close(rel
, ExclusiveLock
);
318 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED
),
319 errmsg("could not find free replication origin OID")));
321 heap_freetuple(tuple
);
326 * Helper function to drop a replication origin.
329 replorigin_drop_guts(Relation rel
, RepOriginId roident
, bool nowait
)
335 * First, clean up the slot state info, if there is any matching slot.
339 LWLockAcquire(ReplicationOriginLock
, LW_EXCLUSIVE
);
341 for (i
= 0; i
< max_replication_slots
; i
++)
343 ReplicationState
*state
= &replication_states
[i
];
345 if (state
->roident
== roident
)
347 /* found our slot, is it busy? */
348 if (state
->acquired_by
!= 0)
350 ConditionVariable
*cv
;
354 (errcode(ERRCODE_OBJECT_IN_USE
),
355 errmsg("could not drop replication origin with OID %d, in use by PID %d",
357 state
->acquired_by
)));
360 * We must wait and then retry. Since we don't know which CV
361 * to wait on until here, we can't readily use
362 * ConditionVariablePrepareToSleep (calling it here would be
363 * wrong, since we could miss the signal if we did so); just
364 * use ConditionVariableSleep directly.
366 cv
= &state
->origin_cv
;
368 LWLockRelease(ReplicationOriginLock
);
370 ConditionVariableSleep(cv
, WAIT_EVENT_REPLICATION_ORIGIN_DROP
);
374 /* first make a WAL log entry */
376 xl_replorigin_drop xlrec
;
378 xlrec
.node_id
= roident
;
380 XLogRegisterData((char *) (&xlrec
), sizeof(xlrec
));
381 XLogInsert(RM_REPLORIGIN_ID
, XLOG_REPLORIGIN_DROP
);
384 /* then clear the in-memory slot */
385 state
->roident
= InvalidRepOriginId
;
386 state
->remote_lsn
= InvalidXLogRecPtr
;
387 state
->local_lsn
= InvalidXLogRecPtr
;
391 LWLockRelease(ReplicationOriginLock
);
392 ConditionVariableCancelSleep();
395 * Now, we can delete the catalog entry.
397 tuple
= SearchSysCache1(REPLORIGIDENT
, ObjectIdGetDatum(roident
));
398 if (!HeapTupleIsValid(tuple
))
399 elog(ERROR
, "cache lookup failed for replication origin with oid %u",
402 CatalogTupleDelete(rel
, &tuple
->t_self
);
403 ReleaseSysCache(tuple
);
405 CommandCounterIncrement();
409 * Drop replication origin (by name).
411 * Needs to be called in a transaction.
414 replorigin_drop_by_name(const char *name
, bool missing_ok
, bool nowait
)
419 Assert(IsTransactionState());
422 * To interlock against concurrent drops, we hold ExclusiveLock on
423 * pg_replication_origin till xact commit.
425 * XXX We can optimize this by acquiring the lock on a specific origin by
426 * using LockSharedObject if required. However, for that, we first to
427 * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
428 * the specific origin and then re-check if the origin still exists.
430 rel
= table_open(ReplicationOriginRelationId
, ExclusiveLock
);
432 roident
= replorigin_by_name(name
, missing_ok
);
434 if (OidIsValid(roident
))
435 replorigin_drop_guts(rel
, roident
, nowait
);
437 /* We keep the lock on pg_replication_origin until commit */
438 table_close(rel
, NoLock
);
442 * Lookup replication origin via its oid and return the name.
444 * The external name is palloc'd in the calling context.
446 * Returns true if the origin is known, false otherwise.
449 replorigin_by_oid(RepOriginId roident
, bool missing_ok
, char **roname
)
452 Form_pg_replication_origin ric
;
454 Assert(OidIsValid((Oid
) roident
));
455 Assert(roident
!= InvalidRepOriginId
);
456 Assert(roident
!= DoNotReplicateId
);
458 tuple
= SearchSysCache1(REPLORIGIDENT
,
459 ObjectIdGetDatum((Oid
) roident
));
461 if (HeapTupleIsValid(tuple
))
463 ric
= (Form_pg_replication_origin
) GETSTRUCT(tuple
);
464 *roname
= text_to_cstring(&ric
->roname
);
465 ReleaseSysCache(tuple
);
475 (errcode(ERRCODE_UNDEFINED_OBJECT
),
476 errmsg("replication origin with OID %u does not exist",
484 /* ---------------------------------------------------------------------------
485 * Functions for handling replication progress.
486 * ---------------------------------------------------------------------------
490 ReplicationOriginShmemSize(void)
495 * XXX: max_replication_slots is arguably the wrong thing to use, as here
496 * we keep the replay state of *remote* transactions. But for now it seems
497 * sufficient to reuse it, rather than introduce a separate GUC.
499 if (max_replication_slots
== 0)
502 size
= add_size(size
, offsetof(ReplicationStateCtl
, states
));
504 size
= add_size(size
,
505 mul_size(max_replication_slots
, sizeof(ReplicationState
)));
510 ReplicationOriginShmemInit(void)
514 if (max_replication_slots
== 0)
517 replication_states_ctl
= (ReplicationStateCtl
*)
518 ShmemInitStruct("ReplicationOriginState",
519 ReplicationOriginShmemSize(),
521 replication_states
= replication_states_ctl
->states
;
527 MemSet(replication_states_ctl
, 0, ReplicationOriginShmemSize());
529 replication_states_ctl
->tranche_id
= LWTRANCHE_REPLICATION_ORIGIN_STATE
;
531 for (i
= 0; i
< max_replication_slots
; i
++)
533 LWLockInitialize(&replication_states
[i
].lock
,
534 replication_states_ctl
->tranche_id
);
535 ConditionVariableInit(&replication_states
[i
].origin_cv
);
540 /* ---------------------------------------------------------------------------
541 * Perform a checkpoint of each replication origin's progress with respect to
542 * the replayed remote_lsn. Make sure that all transactions we refer to in the
543 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
544 * if the transactions were originally committed asynchronously.
546 * We store checkpoints in the following format:
547 * +-------+------------------------+------------------+-----+--------+
548 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
549 * +-------+------------------------+------------------+-----+--------+
551 * So its just the magic, followed by the statically sized
552 * ReplicationStateOnDisk structs. Note that the maximum number of
553 * ReplicationState is determined by max_replication_slots.
554 * ---------------------------------------------------------------------------
557 CheckPointReplicationOrigin(void)
559 const char *tmppath
= "pg_logical/replorigin_checkpoint.tmp";
560 const char *path
= "pg_logical/replorigin_checkpoint";
563 uint32 magic
= REPLICATION_STATE_MAGIC
;
566 if (max_replication_slots
== 0)
571 /* make sure no old temp file is remaining */
572 if (unlink(tmppath
) < 0 && errno
!= ENOENT
)
574 (errcode_for_file_access(),
575 errmsg("could not remove file \"%s\": %m",
579 * no other backend can perform this at the same time; only one checkpoint
580 * can happen at a time.
582 tmpfd
= OpenTransientFile(tmppath
,
583 O_CREAT
| O_EXCL
| O_WRONLY
| PG_BINARY
);
586 (errcode_for_file_access(),
587 errmsg("could not create file \"%s\": %m",
592 if ((write(tmpfd
, &magic
, sizeof(magic
))) != sizeof(magic
))
594 /* if write didn't set errno, assume problem is no disk space */
598 (errcode_for_file_access(),
599 errmsg("could not write to file \"%s\": %m",
602 COMP_CRC32C(crc
, &magic
, sizeof(magic
));
604 /* prevent concurrent creations/drops */
605 LWLockAcquire(ReplicationOriginLock
, LW_SHARED
);
607 /* write actual data */
608 for (i
= 0; i
< max_replication_slots
; i
++)
610 ReplicationStateOnDisk disk_state
;
611 ReplicationState
*curstate
= &replication_states
[i
];
612 XLogRecPtr local_lsn
;
614 if (curstate
->roident
== InvalidRepOriginId
)
617 /* zero, to avoid uninitialized padding bytes */
618 memset(&disk_state
, 0, sizeof(disk_state
));
620 LWLockAcquire(&curstate
->lock
, LW_SHARED
);
622 disk_state
.roident
= curstate
->roident
;
624 disk_state
.remote_lsn
= curstate
->remote_lsn
;
625 local_lsn
= curstate
->local_lsn
;
627 LWLockRelease(&curstate
->lock
);
629 /* make sure we only write out a commit that's persistent */
630 XLogFlush(local_lsn
);
633 if ((write(tmpfd
, &disk_state
, sizeof(disk_state
))) !=
636 /* if write didn't set errno, assume problem is no disk space */
640 (errcode_for_file_access(),
641 errmsg("could not write to file \"%s\": %m",
645 COMP_CRC32C(crc
, &disk_state
, sizeof(disk_state
));
648 LWLockRelease(ReplicationOriginLock
);
650 /* write out the CRC */
653 if ((write(tmpfd
, &crc
, sizeof(crc
))) != sizeof(crc
))
655 /* if write didn't set errno, assume problem is no disk space */
659 (errcode_for_file_access(),
660 errmsg("could not write to file \"%s\": %m",
664 if (CloseTransientFile(tmpfd
) != 0)
666 (errcode_for_file_access(),
667 errmsg("could not close file \"%s\": %m",
670 /* fsync, rename to permanent file, fsync file and directory */
671 durable_rename(tmppath
, path
, PANIC
);
675 * Recover replication replay status from checkpoint data saved earlier by
676 * CheckPointReplicationOrigin.
678 * This only needs to be called at startup and *not* during every checkpoint
679 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
680 * state thereafter can be recovered by looking at commit records.
683 StartupReplicationOrigin(void)
685 const char *path
= "pg_logical/replorigin_checkpoint";
688 uint32 magic
= REPLICATION_STATE_MAGIC
;
693 /* don't want to overwrite already existing state */
694 #ifdef USE_ASSERT_CHECKING
695 static bool already_started
= false;
697 Assert(!already_started
);
698 already_started
= true;
701 if (max_replication_slots
== 0)
706 elog(DEBUG2
, "starting up replication origin progress state");
708 fd
= OpenTransientFile(path
, O_RDONLY
| PG_BINARY
);
711 * might have had max_replication_slots == 0 last run, or we just brought
714 if (fd
< 0 && errno
== ENOENT
)
718 (errcode_for_file_access(),
719 errmsg("could not open file \"%s\": %m",
722 /* verify magic, that is written even if nothing was active */
723 readBytes
= read(fd
, &magic
, sizeof(magic
));
724 if (readBytes
!= sizeof(magic
))
728 (errcode_for_file_access(),
729 errmsg("could not read file \"%s\": %m",
733 (errcode(ERRCODE_DATA_CORRUPTED
),
734 errmsg("could not read file \"%s\": read %d of %zu",
735 path
, readBytes
, sizeof(magic
))));
737 COMP_CRC32C(crc
, &magic
, sizeof(magic
));
739 if (magic
!= REPLICATION_STATE_MAGIC
)
741 (errmsg("replication checkpoint has wrong magic %u instead of %u",
742 magic
, REPLICATION_STATE_MAGIC
)));
744 /* we can skip locking here, no other access is possible */
746 /* recover individual states, until there are no more to be found */
749 ReplicationStateOnDisk disk_state
;
751 readBytes
= read(fd
, &disk_state
, sizeof(disk_state
));
753 /* no further data */
754 if (readBytes
== sizeof(crc
))
756 /* not pretty, but simple ... */
757 file_crc
= *(pg_crc32c
*) &disk_state
;
764 (errcode_for_file_access(),
765 errmsg("could not read file \"%s\": %m",
769 if (readBytes
!= sizeof(disk_state
))
772 (errcode_for_file_access(),
773 errmsg("could not read file \"%s\": read %d of %zu",
774 path
, readBytes
, sizeof(disk_state
))));
777 COMP_CRC32C(crc
, &disk_state
, sizeof(disk_state
));
779 if (last_state
== max_replication_slots
)
781 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED
),
782 errmsg("could not find free replication state, increase max_replication_slots")));
784 /* copy data to shared memory */
785 replication_states
[last_state
].roident
= disk_state
.roident
;
786 replication_states
[last_state
].remote_lsn
= disk_state
.remote_lsn
;
790 (errmsg("recovered replication state of node %u to %X/%X",
792 LSN_FORMAT_ARGS(disk_state
.remote_lsn
))));
795 /* now check checksum */
799 (errcode(ERRCODE_DATA_CORRUPTED
),
800 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
803 if (CloseTransientFile(fd
) != 0)
805 (errcode_for_file_access(),
806 errmsg("could not close file \"%s\": %m",
811 replorigin_redo(XLogReaderState
*record
)
813 uint8 info
= XLogRecGetInfo(record
) & ~XLR_INFO_MASK
;
817 case XLOG_REPLORIGIN_SET
:
819 xl_replorigin_set
*xlrec
=
820 (xl_replorigin_set
*) XLogRecGetData(record
);
822 replorigin_advance(xlrec
->node_id
,
823 xlrec
->remote_lsn
, record
->EndRecPtr
,
824 xlrec
->force
/* backward */ ,
825 false /* WAL log */ );
828 case XLOG_REPLORIGIN_DROP
:
830 xl_replorigin_drop
*xlrec
;
833 xlrec
= (xl_replorigin_drop
*) XLogRecGetData(record
);
835 for (i
= 0; i
< max_replication_slots
; i
++)
837 ReplicationState
*state
= &replication_states
[i
];
840 if (state
->roident
== xlrec
->node_id
)
843 state
->roident
= InvalidRepOriginId
;
844 state
->remote_lsn
= InvalidXLogRecPtr
;
845 state
->local_lsn
= InvalidXLogRecPtr
;
852 elog(PANIC
, "replorigin_redo: unknown op code %u", info
);
858 * Tell the replication origin progress machinery that a commit from 'node'
859 * that originated at the LSN remote_commit on the remote node was replayed
860 * successfully and that we don't need to do so again. In combination with
861 * setting up replorigin_session_origin_lsn and replorigin_session_origin
862 * that ensures we won't lose knowledge about that after a crash if the
863 * transaction had a persistent effect (think of asynchronous commits).
865 * local_commit needs to be a local LSN of the commit so that we can make sure
866 * upon a checkpoint that enough WAL has been persisted to disk.
868 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
869 * unless running in recovery.
872 replorigin_advance(RepOriginId node
,
873 XLogRecPtr remote_commit
, XLogRecPtr local_commit
,
874 bool go_backward
, bool wal_log
)
877 ReplicationState
*replication_state
= NULL
;
878 ReplicationState
*free_state
= NULL
;
880 Assert(node
!= InvalidRepOriginId
);
882 /* we don't track DoNotReplicateId */
883 if (node
== DoNotReplicateId
)
887 * XXX: For the case where this is called by WAL replay, it'd be more
888 * efficient to restore into a backend local hashtable and only dump into
889 * shmem after recovery is finished. Let's wait with implementing that
890 * till it's shown to be a measurable expense
893 /* Lock exclusively, as we may have to create a new table entry. */
894 LWLockAcquire(ReplicationOriginLock
, LW_EXCLUSIVE
);
897 * Search for either an existing slot for the origin, or a free one we can
900 for (i
= 0; i
< max_replication_slots
; i
++)
902 ReplicationState
*curstate
= &replication_states
[i
];
904 /* remember where to insert if necessary */
905 if (curstate
->roident
== InvalidRepOriginId
&&
908 free_state
= curstate
;
913 if (curstate
->roident
!= node
)
919 replication_state
= curstate
;
921 LWLockAcquire(&replication_state
->lock
, LW_EXCLUSIVE
);
923 /* Make sure it's not used by somebody else */
924 if (replication_state
->acquired_by
!= 0)
927 (errcode(ERRCODE_OBJECT_IN_USE
),
928 errmsg("replication origin with OID %d is already active for PID %d",
929 replication_state
->roident
,
930 replication_state
->acquired_by
)));
936 if (replication_state
== NULL
&& free_state
== NULL
)
938 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED
),
939 errmsg("could not find free replication state slot for replication origin with OID %u",
941 errhint("Increase max_replication_slots and try again.")));
943 if (replication_state
== NULL
)
945 /* initialize new slot */
946 LWLockAcquire(&free_state
->lock
, LW_EXCLUSIVE
);
947 replication_state
= free_state
;
948 Assert(replication_state
->remote_lsn
== InvalidXLogRecPtr
);
949 Assert(replication_state
->local_lsn
== InvalidXLogRecPtr
);
950 replication_state
->roident
= node
;
953 Assert(replication_state
->roident
!= InvalidRepOriginId
);
956 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
957 * and the standby gets the message. Primarily this will be called during
958 * WAL replay (of commit records) where no WAL logging is necessary.
962 xl_replorigin_set xlrec
;
964 xlrec
.remote_lsn
= remote_commit
;
965 xlrec
.node_id
= node
;
966 xlrec
.force
= go_backward
;
969 XLogRegisterData((char *) (&xlrec
), sizeof(xlrec
));
971 XLogInsert(RM_REPLORIGIN_ID
, XLOG_REPLORIGIN_SET
);
975 * Due to - harmless - race conditions during a checkpoint we could see
976 * values here that are older than the ones we already have in memory. We
977 * could also see older values for prepared transactions when the prepare
978 * is sent at a later point of time along with commit prepared and there
979 * are other transactions commits between prepare and commit prepared. See
980 * ReorderBufferFinishPrepared. Don't overwrite those.
982 if (go_backward
|| replication_state
->remote_lsn
< remote_commit
)
983 replication_state
->remote_lsn
= remote_commit
;
984 if (local_commit
!= InvalidXLogRecPtr
&&
985 (go_backward
|| replication_state
->local_lsn
< local_commit
))
986 replication_state
->local_lsn
= local_commit
;
987 LWLockRelease(&replication_state
->lock
);
990 * Release *after* changing the LSNs, slot isn't acquired and thus could
991 * otherwise be dropped anytime.
993 LWLockRelease(ReplicationOriginLock
);
998 replorigin_get_progress(RepOriginId node
, bool flush
)
1001 XLogRecPtr local_lsn
= InvalidXLogRecPtr
;
1002 XLogRecPtr remote_lsn
= InvalidXLogRecPtr
;
1004 /* prevent slots from being concurrently dropped */
1005 LWLockAcquire(ReplicationOriginLock
, LW_SHARED
);
1007 for (i
= 0; i
< max_replication_slots
; i
++)
1009 ReplicationState
*state
;
1011 state
= &replication_states
[i
];
1013 if (state
->roident
== node
)
1015 LWLockAcquire(&state
->lock
, LW_SHARED
);
1017 remote_lsn
= state
->remote_lsn
;
1018 local_lsn
= state
->local_lsn
;
1020 LWLockRelease(&state
->lock
);
1026 LWLockRelease(ReplicationOriginLock
);
1028 if (flush
&& local_lsn
!= InvalidXLogRecPtr
)
1029 XLogFlush(local_lsn
);
1035 * Tear down a (possibly) configured session replication origin during process
1039 ReplicationOriginExitCleanup(int code
, Datum arg
)
1041 ConditionVariable
*cv
= NULL
;
1043 LWLockAcquire(ReplicationOriginLock
, LW_EXCLUSIVE
);
1045 if (session_replication_state
!= NULL
&&
1046 session_replication_state
->acquired_by
== MyProcPid
)
1048 cv
= &session_replication_state
->origin_cv
;
1050 session_replication_state
->acquired_by
= 0;
1051 session_replication_state
= NULL
;
1054 LWLockRelease(ReplicationOriginLock
);
1057 ConditionVariableBroadcast(cv
);
1061 * Setup a replication origin in the shared memory struct if it doesn't
1062 * already exists and cache access to the specific ReplicationSlot so the
1063 * array doesn't have to be searched when calling
1064 * replorigin_session_advance().
1066 * Obviously only one such cached origin can exist per process and the current
1067 * cached value can only be set again after the previous value is torn down
1068 * with replorigin_session_reset().
1071 replorigin_session_setup(RepOriginId node
)
1073 static bool registered_cleanup
;
1077 if (!registered_cleanup
)
1079 on_shmem_exit(ReplicationOriginExitCleanup
, 0);
1080 registered_cleanup
= true;
1083 Assert(max_replication_slots
> 0);
1085 if (session_replication_state
!= NULL
)
1087 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1088 errmsg("cannot setup replication origin when one is already setup")));
1090 /* Lock exclusively, as we may have to create a new table entry. */
1091 LWLockAcquire(ReplicationOriginLock
, LW_EXCLUSIVE
);
1094 * Search for either an existing slot for the origin, or a free one we can
1097 for (i
= 0; i
< max_replication_slots
; i
++)
1099 ReplicationState
*curstate
= &replication_states
[i
];
1101 /* remember where to insert if necessary */
1102 if (curstate
->roident
== InvalidRepOriginId
&&
1110 if (curstate
->roident
!= node
)
1113 else if (curstate
->acquired_by
!= 0)
1116 (errcode(ERRCODE_OBJECT_IN_USE
),
1117 errmsg("replication origin with OID %d is already active for PID %d",
1118 curstate
->roident
, curstate
->acquired_by
)));
1121 /* ok, found slot */
1122 session_replication_state
= curstate
;
1126 if (session_replication_state
== NULL
&& free_slot
== -1)
1128 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED
),
1129 errmsg("could not find free replication state slot for replication origin with OID %u",
1131 errhint("Increase max_replication_slots and try again.")));
1132 else if (session_replication_state
== NULL
)
1134 /* initialize new slot */
1135 session_replication_state
= &replication_states
[free_slot
];
1136 Assert(session_replication_state
->remote_lsn
== InvalidXLogRecPtr
);
1137 Assert(session_replication_state
->local_lsn
== InvalidXLogRecPtr
);
1138 session_replication_state
->roident
= node
;
1142 Assert(session_replication_state
->roident
!= InvalidRepOriginId
);
1144 session_replication_state
->acquired_by
= MyProcPid
;
1146 LWLockRelease(ReplicationOriginLock
);
1148 /* probably this one is pointless */
1149 ConditionVariableBroadcast(&session_replication_state
->origin_cv
);
1153 * Reset replay state previously setup in this session.
1155 * This function may only be called if an origin was setup with
1156 * replorigin_session_setup().
1159 replorigin_session_reset(void)
1161 ConditionVariable
*cv
;
1163 Assert(max_replication_slots
!= 0);
1165 if (session_replication_state
== NULL
)
1167 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1168 errmsg("no replication origin is configured")));
1170 LWLockAcquire(ReplicationOriginLock
, LW_EXCLUSIVE
);
1172 session_replication_state
->acquired_by
= 0;
1173 cv
= &session_replication_state
->origin_cv
;
1174 session_replication_state
= NULL
;
1176 LWLockRelease(ReplicationOriginLock
);
1178 ConditionVariableBroadcast(cv
);
1182 * Do the same work replorigin_advance() does, just on the session's
1183 * configured origin.
1185 * This is noticeably cheaper than using replorigin_advance().
1188 replorigin_session_advance(XLogRecPtr remote_commit
, XLogRecPtr local_commit
)
1190 Assert(session_replication_state
!= NULL
);
1191 Assert(session_replication_state
->roident
!= InvalidRepOriginId
);
1193 LWLockAcquire(&session_replication_state
->lock
, LW_EXCLUSIVE
);
1194 if (session_replication_state
->local_lsn
< local_commit
)
1195 session_replication_state
->local_lsn
= local_commit
;
1196 if (session_replication_state
->remote_lsn
< remote_commit
)
1197 session_replication_state
->remote_lsn
= remote_commit
;
1198 LWLockRelease(&session_replication_state
->lock
);
1202 * Ask the machinery about the point up to which we successfully replayed
1203 * changes from an already setup replication origin.
1206 replorigin_session_get_progress(bool flush
)
1208 XLogRecPtr remote_lsn
;
1209 XLogRecPtr local_lsn
;
1211 Assert(session_replication_state
!= NULL
);
1213 LWLockAcquire(&session_replication_state
->lock
, LW_SHARED
);
1214 remote_lsn
= session_replication_state
->remote_lsn
;
1215 local_lsn
= session_replication_state
->local_lsn
;
1216 LWLockRelease(&session_replication_state
->lock
);
1218 if (flush
&& local_lsn
!= InvalidXLogRecPtr
)
1219 XLogFlush(local_lsn
);
1226 /* ---------------------------------------------------------------------------
1227 * SQL functions for working with replication origin.
1229 * These mostly should be fairly short wrappers around more generic functions.
1230 * ---------------------------------------------------------------------------
1234 * Create replication origin for the passed in name, and return the assigned
1238 pg_replication_origin_create(PG_FUNCTION_ARGS
)
1241 RepOriginId roident
;
1243 replorigin_check_prerequisites(false, false);
1245 name
= text_to_cstring((text
*) DatumGetPointer(PG_GETARG_DATUM(0)));
1247 /* Replication origins "pg_xxx" are reserved for internal use */
1248 if (IsReservedName(name
))
1250 (errcode(ERRCODE_RESERVED_NAME
),
1251 errmsg("replication origin name \"%s\" is reserved",
1253 errdetail("Origin names starting with \"pg_\" are reserved.")));
1256 * If built with appropriate switch, whine when regression-testing
1257 * conventions for replication origin names are violated.
1259 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1260 if (strncmp(name
, "regress_", 8) != 0)
1261 elog(WARNING
, "replication origins created by regression test cases should have names starting with \"regress_\"");
1264 roident
= replorigin_create(name
);
1268 PG_RETURN_OID(roident
);
1272 * Drop replication origin.
1275 pg_replication_origin_drop(PG_FUNCTION_ARGS
)
1279 replorigin_check_prerequisites(false, false);
1281 name
= text_to_cstring((text
*) DatumGetPointer(PG_GETARG_DATUM(0)));
1283 replorigin_drop_by_name(name
, false, true);
1291 * Return oid of a replication origin.
1294 pg_replication_origin_oid(PG_FUNCTION_ARGS
)
1297 RepOriginId roident
;
1299 replorigin_check_prerequisites(false, false);
1301 name
= text_to_cstring((text
*) DatumGetPointer(PG_GETARG_DATUM(0)));
1302 roident
= replorigin_by_name(name
, true);
1306 if (OidIsValid(roident
))
1307 PG_RETURN_OID(roident
);
1312 * Setup a replication origin for this session.
1315 pg_replication_origin_session_setup(PG_FUNCTION_ARGS
)
1320 replorigin_check_prerequisites(true, false);
1322 name
= text_to_cstring((text
*) DatumGetPointer(PG_GETARG_DATUM(0)));
1323 origin
= replorigin_by_name(name
, false);
1324 replorigin_session_setup(origin
);
1326 replorigin_session_origin
= origin
;
1334 * Reset previously setup origin in this session
1337 pg_replication_origin_session_reset(PG_FUNCTION_ARGS
)
1339 replorigin_check_prerequisites(true, false);
1341 replorigin_session_reset();
1343 replorigin_session_origin
= InvalidRepOriginId
;
1344 replorigin_session_origin_lsn
= InvalidXLogRecPtr
;
1345 replorigin_session_origin_timestamp
= 0;
1351 * Has a replication origin been setup for this session.
1354 pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS
)
1356 replorigin_check_prerequisites(false, false);
1358 PG_RETURN_BOOL(replorigin_session_origin
!= InvalidRepOriginId
);
1363 * Return the replication progress for origin setup in the current session.
1365 * If 'flush' is set to true it is ensured that the returned value corresponds
1366 * to a local transaction that has been flushed. This is useful if asynchronous
1367 * commits are used when replaying replicated transactions.
1370 pg_replication_origin_session_progress(PG_FUNCTION_ARGS
)
1372 XLogRecPtr remote_lsn
= InvalidXLogRecPtr
;
1373 bool flush
= PG_GETARG_BOOL(0);
1375 replorigin_check_prerequisites(true, false);
1377 if (session_replication_state
== NULL
)
1379 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1380 errmsg("no replication origin is configured")));
1382 remote_lsn
= replorigin_session_get_progress(flush
);
1384 if (remote_lsn
== InvalidXLogRecPtr
)
1387 PG_RETURN_LSN(remote_lsn
);
1391 pg_replication_origin_xact_setup(PG_FUNCTION_ARGS
)
1393 XLogRecPtr location
= PG_GETARG_LSN(0);
1395 replorigin_check_prerequisites(true, false);
1397 if (session_replication_state
== NULL
)
1399 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1400 errmsg("no replication origin is configured")));
1402 replorigin_session_origin_lsn
= location
;
1403 replorigin_session_origin_timestamp
= PG_GETARG_TIMESTAMPTZ(1);
1409 pg_replication_origin_xact_reset(PG_FUNCTION_ARGS
)
1411 replorigin_check_prerequisites(true, false);
1413 replorigin_session_origin_lsn
= InvalidXLogRecPtr
;
1414 replorigin_session_origin_timestamp
= 0;
1421 pg_replication_origin_advance(PG_FUNCTION_ARGS
)
1423 text
*name
= PG_GETARG_TEXT_PP(0);
1424 XLogRecPtr remote_commit
= PG_GETARG_LSN(1);
1427 replorigin_check_prerequisites(true, false);
1429 /* lock to prevent the replication origin from vanishing */
1430 LockRelationOid(ReplicationOriginRelationId
, RowExclusiveLock
);
1432 node
= replorigin_by_name(text_to_cstring(name
), false);
1435 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1436 * xact hasn't committed yet. This is why this function should be used to
1437 * set up the initial replication state, but not for replay.
1439 replorigin_advance(node
, remote_commit
, InvalidXLogRecPtr
,
1440 true /* go backward */ , true /* WAL log */ );
1442 UnlockRelationOid(ReplicationOriginRelationId
, RowExclusiveLock
);
1449 * Return the replication progress for an individual replication origin.
1451 * If 'flush' is set to true it is ensured that the returned value corresponds
1452 * to a local transaction that has been flushed. This is useful if asynchronous
1453 * commits are used when replaying replicated transactions.
1456 pg_replication_origin_progress(PG_FUNCTION_ARGS
)
1460 RepOriginId roident
;
1461 XLogRecPtr remote_lsn
= InvalidXLogRecPtr
;
1463 replorigin_check_prerequisites(true, true);
1465 name
= text_to_cstring((text
*) DatumGetPointer(PG_GETARG_DATUM(0)));
1466 flush
= PG_GETARG_BOOL(1);
1468 roident
= replorigin_by_name(name
, false);
1469 Assert(OidIsValid(roident
));
1471 remote_lsn
= replorigin_get_progress(roident
, flush
);
1473 if (remote_lsn
== InvalidXLogRecPtr
)
1476 PG_RETURN_LSN(remote_lsn
);
1481 pg_show_replication_origin_status(PG_FUNCTION_ARGS
)
1483 ReturnSetInfo
*rsinfo
= (ReturnSetInfo
*) fcinfo
->resultinfo
;
1485 Tuplestorestate
*tupstore
;
1486 MemoryContext per_query_ctx
;
1487 MemoryContext oldcontext
;
1489 #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1491 /* we want to return 0 rows if slot is set to zero */
1492 replorigin_check_prerequisites(false, true);
1494 if (rsinfo
== NULL
|| !IsA(rsinfo
, ReturnSetInfo
))
1496 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
1497 errmsg("set-valued function called in context that cannot accept a set")));
1498 if (!(rsinfo
->allowedModes
& SFRM_Materialize
))
1500 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
1501 errmsg("materialize mode required, but it is not allowed in this context")));
1502 if (get_call_result_type(fcinfo
, NULL
, &tupdesc
) != TYPEFUNC_COMPOSITE
)
1503 elog(ERROR
, "return type must be a row type");
1505 if (tupdesc
->natts
!= REPLICATION_ORIGIN_PROGRESS_COLS
)
1506 elog(ERROR
, "wrong function definition");
1508 per_query_ctx
= rsinfo
->econtext
->ecxt_per_query_memory
;
1509 oldcontext
= MemoryContextSwitchTo(per_query_ctx
);
1511 tupstore
= tuplestore_begin_heap(true, false, work_mem
);
1512 rsinfo
->returnMode
= SFRM_Materialize
;
1513 rsinfo
->setResult
= tupstore
;
1514 rsinfo
->setDesc
= tupdesc
;
1516 MemoryContextSwitchTo(oldcontext
);
1519 /* prevent slots from being concurrently dropped */
1520 LWLockAcquire(ReplicationOriginLock
, LW_SHARED
);
1523 * Iterate through all possible replication_states, display if they are
1524 * filled. Note that we do not take any locks, so slightly corrupted/out
1525 * of date values are a possibility.
1527 for (i
= 0; i
< max_replication_slots
; i
++)
1529 ReplicationState
*state
;
1530 Datum values
[REPLICATION_ORIGIN_PROGRESS_COLS
];
1531 bool nulls
[REPLICATION_ORIGIN_PROGRESS_COLS
];
1534 state
= &replication_states
[i
];
1536 /* unused slot, nothing to display */
1537 if (state
->roident
== InvalidRepOriginId
)
1540 memset(values
, 0, sizeof(values
));
1541 memset(nulls
, 1, sizeof(nulls
));
1543 values
[0] = ObjectIdGetDatum(state
->roident
);
1547 * We're not preventing the origin to be dropped concurrently, so
1548 * silently accept that it might be gone.
1550 if (replorigin_by_oid(state
->roident
, true,
1553 values
[1] = CStringGetTextDatum(roname
);
1557 LWLockAcquire(&state
->lock
, LW_SHARED
);
1559 values
[2] = LSNGetDatum(state
->remote_lsn
);
1562 values
[3] = LSNGetDatum(state
->local_lsn
);
1565 LWLockRelease(&state
->lock
);
1567 tuplestore_putvalues(tupstore
, tupdesc
, values
, nulls
);
1570 tuplestore_donestoring(tupstore
);
1572 LWLockRelease(ReplicationOriginLock
);
1574 #undef REPLICATION_ORIGIN_PROGRESS_COLS