1 /*-------------------------------------------------------------------------
3 * PostgreSQL logical replication worker (apply)
5 * Copyright (c) 2016-2022, PostgreSQL Global Development Group
8 * src/backend/replication/logical/worker.c
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
22 * STREAMED TRANSACTIONS
23 * ---------------------
24 * Streamed transactions (large transactions exceeding a memory limit on the
25 * upstream) are not applied immediately, but instead, the data is written
26 * to temporary files and then applied at once when the final commit arrives.
28 * Unlike the regular (non-streamed) case, handling streamed transactions has
29 * to handle aborts of both the toplevel transaction and subtransactions. This
30 * is achieved by tracking offsets for subtransactions, which is then used
31 * to truncate the file with serialized changes.
33 * The files are placed in tmp file directory by default, and the filenames
34 * include both the XID of the toplevel transaction and OID of the
35 * subscription. This is necessary so that different workers processing a
36 * remote transaction with the same XID doesn't interfere.
38 * We use BufFiles instead of using normal temporary files because (a) the
39 * BufFile infrastructure supports temporary files that exceed the OS file size
40 * limit, (b) provides a way for automatic clean up on the error and (c) provides
41 * a way to survive these files across local transactions and allow to open and
42 * close at stream start and close. We decided to use FileSet
43 * infrastructure as without that it deletes the files on the closure of the
44 * file and if we decide to keep stream files open across the start/stop stream
45 * then it will consume a lot of memory (more than 8K for each BufFile and
46 * there could be multiple such BufFiles as the subscriber could receive
47 * multiple start/stop streams for different transactions before getting the
48 * commit). Moreover, if we don't use FileSet then we also need to invent
49 * a new way to pass filenames to BufFile APIs so that we are allowed to open
50 * the file we desired across multiple stream-open calls for the same
53 * TWO_PHASE TRANSACTIONS
54 * ----------------------
55 * Two phase transactions are replayed at prepare and then committed or
56 * rolled back at commit prepared and rollback prepared respectively. It is
57 * possible to have a prepared transaction that arrives at the apply worker
58 * when the tablesync is busy doing the initial copy. In this case, the apply
59 * worker skips all the prepared operations [e.g. inserts] while the tablesync
60 * is still busy (see the condition of should_apply_changes_for_rel). The
61 * tablesync worker might not get such a prepared transaction because say it
62 * was prior to the initial consistent point but might have got some later
63 * commits. Now, the tablesync worker will exit without doing anything for the
64 * prepared transaction skipped by the apply worker as the sync location for it
65 * will be already ahead of the apply worker's current location. This would lead
66 * to an "empty prepare", because later when the apply worker does the commit
67 * prepare, there is nothing in it (the inserts were skipped earlier).
69 * To avoid this, and similar prepare confusions the subscription's two_phase
70 * commit is enabled only after the initial sync is over. The two_phase option
71 * has been implemented as a tri-state with values DISABLED, PENDING, and
74 * Even if the user specifies they want a subscription with two_phase = on,
75 * internally it will start with a tri-state of PENDING which only becomes
76 * ENABLED after all tablesync initializations are completed - i.e. when all
77 * tablesync workers have reached their READY state. In other words, the value
78 * PENDING is only a temporary state for subscription start-up.
80 * Until the two_phase is properly available (ENABLED) the subscription will
81 * behave as if two_phase = off. When the apply worker detects that all
82 * tablesyncs have become READY (while the tri-state was PENDING) it will
83 * restart the apply worker process. This happens in
84 * process_syncing_tables_for_apply.
86 * When the (re-started) apply worker finds that all tablesyncs are READY for a
87 * two_phase tri-state of PENDING it start streaming messages with the
88 * two_phase option which in turn enables the decoding of two-phase commits at
89 * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
90 * Now, it is possible that during the time we have not enabled two_phase, the
91 * publisher (replication server) would have skipped some prepares but we
92 * ensure that such prepares are sent along with commit prepare, see
93 * ReorderBufferFinishPrepared.
95 * If the subscription has no tables then a two_phase tri-state PENDING is
96 * left unchanged. This lets the user still do an ALTER TABLE REFRESH
97 * PUBLICATION which might otherwise be disallowed (see below).
99 * If ever a user needs to be aware of the tri-state value, they can fetch it
100 * from the pg_subscription catalog (see column subtwophasestate).
102 * We don't allow to toggle two_phase option of a subscription because it can
103 * lead to an inconsistent replica. Consider, initially, it was on and we have
104 * received some prepare then we turn it off, now at commit time the server
105 * will send the entire transaction data along with the commit. With some more
106 * analysis, we can allow changing this option from off to on but not sure if
107 * that alone would be useful.
109 * Finally, to avoid problems mentioned in previous paragraphs from any
110 * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
111 * to 'off' and then again back to 'on') there is a restriction for
112 * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
113 * the two_phase tri-state is ENABLED, except when copy_data = false.
115 * We can get prepare of the same GID more than once for the genuine cases
116 * where we have defined multiple subscriptions for publications on the same
117 * server and prepared transaction has operations on tables subscribed to those
118 * subscriptions. For such cases, if we use the GID sent by publisher one of
119 * the prepares will be successful and others will fail, in which case the
120 * server will send them again. Now, this can lead to a deadlock if user has
121 * set synchronous_standby_names for all the subscriptions on subscriber. To
122 * avoid such deadlocks, we generate a unique GID (consisting of the
123 * subscription oid and the xid of the prepared transaction) for each prepare
124 * transaction on the subscriber.
125 *-------------------------------------------------------------------------
128 #include "postgres.h"
130 #include <sys/stat.h>
133 #include "access/table.h"
134 #include "access/tableam.h"
135 #include "access/twophase.h"
136 #include "access/xact.h"
137 #include "access/xlog_internal.h"
138 #include "catalog/catalog.h"
139 #include "catalog/namespace.h"
140 #include "catalog/partition.h"
141 #include "catalog/pg_inherits.h"
142 #include "catalog/pg_subscription.h"
143 #include "catalog/pg_subscription_rel.h"
144 #include "catalog/pg_tablespace.h"
145 #include "commands/tablecmds.h"
146 #include "commands/tablespace.h"
147 #include "commands/trigger.h"
148 #include "executor/executor.h"
149 #include "executor/execPartition.h"
150 #include "executor/nodeModifyTable.h"
152 #include "libpq/pqformat.h"
153 #include "libpq/pqsignal.h"
154 #include "mb/pg_wchar.h"
155 #include "miscadmin.h"
156 #include "nodes/makefuncs.h"
157 #include "optimizer/optimizer.h"
159 #include "postmaster/bgworker.h"
160 #include "postmaster/interrupt.h"
161 #include "postmaster/postmaster.h"
162 #include "postmaster/walwriter.h"
163 #include "replication/decode.h"
164 #include "replication/logical.h"
165 #include "replication/logicalproto.h"
166 #include "replication/logicalrelation.h"
167 #include "replication/logicalworker.h"
168 #include "replication/origin.h"
169 #include "replication/reorderbuffer.h"
170 #include "replication/snapbuild.h"
171 #include "replication/walreceiver.h"
172 #include "replication/worker_internal.h"
173 #include "rewrite/rewriteHandler.h"
174 #include "storage/buffile.h"
175 #include "storage/bufmgr.h"
176 #include "storage/fd.h"
177 #include "storage/ipc.h"
178 #include "storage/lmgr.h"
179 #include "storage/proc.h"
180 #include "storage/procarray.h"
181 #include "tcop/tcopprot.h"
182 #include "utils/builtins.h"
183 #include "utils/catcache.h"
184 #include "utils/dynahash.h"
185 #include "utils/datum.h"
186 #include "utils/fmgroids.h"
187 #include "utils/guc.h"
188 #include "utils/inval.h"
189 #include "utils/lsyscache.h"
190 #include "utils/memutils.h"
191 #include "utils/rel.h"
192 #include "utils/syscache.h"
193 #include "utils/timeout.h"
195 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
197 typedef struct FlushPosition
200 XLogRecPtr local_end
;
201 XLogRecPtr remote_end
;
204 static dlist_head lsn_mapping
= DLIST_STATIC_INIT(lsn_mapping
);
206 typedef struct ApplyExecutionData
208 EState
*estate
; /* executor state, used to track resources */
210 LogicalRepRelMapEntry
*targetRel
; /* replication target rel */
211 ResultRelInfo
*targetRelInfo
; /* ResultRelInfo for same */
213 /* These fields are used when the target relation is partitioned: */
214 ModifyTableState
*mtstate
; /* dummy ModifyTable state */
215 PartitionTupleRouting
*proute
; /* partition routing info */
216 } ApplyExecutionData
;
218 /* Struct for saving and restoring apply errcontext information */
219 typedef struct ApplyErrorCallbackArg
221 LogicalRepMsgType command
; /* 0 if invalid */
222 LogicalRepRelMapEntry
*rel
;
224 /* Remote node information */
225 int remote_attnum
; /* -1 if invalid */
226 TransactionId remote_xid
;
227 TimestampTz ts
; /* commit, rollback, or prepare timestamp */
228 } ApplyErrorCallbackArg
;
230 static ApplyErrorCallbackArg apply_error_callback_arg
=
235 .remote_xid
= InvalidTransactionId
,
239 static MemoryContext ApplyMessageContext
= NULL
;
240 MemoryContext ApplyContext
= NULL
;
242 /* per stream context for streaming transactions */
243 static MemoryContext LogicalStreamingContext
= NULL
;
245 WalReceiverConn
*LogRepWorkerWalRcvConn
= NULL
;
247 Subscription
*MySubscription
= NULL
;
248 bool MySubscriptionValid
= false;
250 bool in_remote_transaction
= false;
251 static XLogRecPtr remote_final_lsn
= InvalidXLogRecPtr
;
253 /* fields valid only when processing streamed transaction */
254 static bool in_streamed_transaction
= false;
256 static TransactionId stream_xid
= InvalidTransactionId
;
258 /* BufFile handle of the current streaming file */
259 static BufFile
*stream_fd
= NULL
;
261 typedef struct SubXactInfo
263 TransactionId xid
; /* XID of the subxact */
264 int fileno
; /* file number in the buffile */
265 off_t offset
; /* offset in the file */
268 /* Sub-transaction data for the current streaming transaction */
269 typedef struct ApplySubXactData
271 uint32 nsubxacts
; /* number of sub-transactions */
272 uint32 nsubxacts_max
; /* current capacity of subxacts */
273 TransactionId subxact_last
; /* xid of the last sub-transaction */
274 SubXactInfo
*subxacts
; /* sub-xact offset in changes file */
277 static ApplySubXactData subxact_data
= {0, 0, InvalidTransactionId
, NULL
};
279 static inline void subxact_filename(char *path
, Oid subid
, TransactionId xid
);
280 static inline void changes_filename(char *path
, Oid subid
, TransactionId xid
);
283 * Information about subtransactions of a given toplevel transaction.
285 static void subxact_info_write(Oid subid
, TransactionId xid
);
286 static void subxact_info_read(Oid subid
, TransactionId xid
);
287 static void subxact_info_add(TransactionId xid
);
288 static inline void cleanup_subxact_info(void);
291 * Serialize and deserialize changes for a toplevel transaction.
293 static void stream_cleanup_files(Oid subid
, TransactionId xid
);
294 static void stream_open_file(Oid subid
, TransactionId xid
, bool first
);
295 static void stream_write_change(char action
, StringInfo s
);
296 static void stream_close_file(void);
298 static void send_feedback(XLogRecPtr recvpos
, bool force
, bool requestReply
);
300 static void store_flush_position(XLogRecPtr remote_lsn
);
302 static void maybe_reread_subscription(void);
304 /* prototype needed because of stream_commit */
305 static void apply_dispatch(StringInfo s
);
307 static void apply_handle_commit_internal(LogicalRepCommitData
*commit_data
);
308 static void apply_handle_insert_internal(ApplyExecutionData
*edata
,
309 ResultRelInfo
*relinfo
,
310 TupleTableSlot
*remoteslot
);
311 static void apply_handle_update_internal(ApplyExecutionData
*edata
,
312 ResultRelInfo
*relinfo
,
313 TupleTableSlot
*remoteslot
,
314 LogicalRepTupleData
*newtup
);
315 static void apply_handle_delete_internal(ApplyExecutionData
*edata
,
316 ResultRelInfo
*relinfo
,
317 TupleTableSlot
*remoteslot
);
318 static bool FindReplTupleInLocalRel(EState
*estate
, Relation localrel
,
319 LogicalRepRelation
*remoterel
,
320 TupleTableSlot
*remoteslot
,
321 TupleTableSlot
**localslot
);
322 static void apply_handle_tuple_routing(ApplyExecutionData
*edata
,
323 TupleTableSlot
*remoteslot
,
324 LogicalRepTupleData
*newtup
,
327 /* Compute GID for two_phase transactions */
328 static void TwoPhaseTransactionGid(Oid subid
, TransactionId xid
, char *gid
, int szgid
);
330 /* Common streaming function to apply all the spooled messages */
331 static void apply_spooled_messages(TransactionId xid
, XLogRecPtr lsn
);
333 /* Functions for apply error callback */
334 static void apply_error_callback(void *arg
);
335 static inline void set_apply_error_context_xact(TransactionId xid
, TimestampTz ts
);
336 static inline void reset_apply_error_context_info(void);
339 * Should this worker apply changes for given relation.
341 * This is mainly needed for initial relation data sync as that runs in
342 * separate worker process running in parallel and we need some way to skip
343 * changes coming to the main apply worker during the sync of a table.
345 * Note we need to do smaller or equals comparison for SYNCDONE state because
346 * it might hold position of end of initial slot consistent point WAL
347 * record + 1 (ie start of next record) and next record can be COMMIT of
348 * transaction we are now processing (which is what we set remote_final_lsn
349 * to in apply_handle_begin).
352 should_apply_changes_for_rel(LogicalRepRelMapEntry
*rel
)
354 if (am_tablesync_worker())
355 return MyLogicalRepWorker
->relid
== rel
->localreloid
;
357 return (rel
->state
== SUBREL_STATE_READY
||
358 (rel
->state
== SUBREL_STATE_SYNCDONE
&&
359 rel
->statelsn
<= remote_final_lsn
));
363 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
365 * Start a transaction, if this is the first step (else we keep using the
366 * existing transaction).
367 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
370 begin_replication_step(void)
372 SetCurrentStatementStartTimestamp();
374 if (!IsTransactionState())
376 StartTransactionCommand();
377 maybe_reread_subscription();
380 PushActiveSnapshot(GetTransactionSnapshot());
382 MemoryContextSwitchTo(ApplyMessageContext
);
386 * Finish up one step of a replication transaction.
387 * Callers of begin_replication_step() must also call this.
389 * We don't close out the transaction here, but we should increment
390 * the command counter to make the effects of this step visible.
393 end_replication_step(void)
397 CommandCounterIncrement();
401 * Handle streamed transactions.
403 * If in streaming mode (receiving a block of streamed transaction), we
404 * simply redirect it to a file for the proper toplevel transaction.
406 * Returns true for streamed transactions, false otherwise (regular mode).
409 handle_streamed_transaction(LogicalRepMsgType action
, StringInfo s
)
413 /* not in streaming mode */
414 if (!in_streamed_transaction
)
417 Assert(stream_fd
!= NULL
);
418 Assert(TransactionIdIsValid(stream_xid
));
421 * We should have received XID of the subxact as the first part of the
422 * message, so extract it.
424 xid
= pq_getmsgint(s
, 4);
426 if (!TransactionIdIsValid(xid
))
428 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
429 errmsg_internal("invalid transaction ID in streamed replication transaction")));
431 /* Add the new subxact to the array (unless already there). */
432 subxact_info_add(xid
);
434 /* write the change to the current file */
435 stream_write_change(action
, s
);
441 * Executor state preparation for evaluation of constraint expressions,
442 * indexes and triggers for the specified relation.
444 * Note that the caller must open and close any indexes to be updated.
446 static ApplyExecutionData
*
447 create_edata_for_relation(LogicalRepRelMapEntry
*rel
)
449 ApplyExecutionData
*edata
;
452 ResultRelInfo
*resultRelInfo
;
454 edata
= (ApplyExecutionData
*) palloc0(sizeof(ApplyExecutionData
));
455 edata
->targetRel
= rel
;
457 edata
->estate
= estate
= CreateExecutorState();
459 rte
= makeNode(RangeTblEntry
);
460 rte
->rtekind
= RTE_RELATION
;
461 rte
->relid
= RelationGetRelid(rel
->localrel
);
462 rte
->relkind
= rel
->localrel
->rd_rel
->relkind
;
463 rte
->rellockmode
= AccessShareLock
;
464 ExecInitRangeTable(estate
, list_make1(rte
));
466 edata
->targetRelInfo
= resultRelInfo
= makeNode(ResultRelInfo
);
469 * Use Relation opened by logicalrep_rel_open() instead of opening it
472 InitResultRelInfo(resultRelInfo
, rel
->localrel
, 1, NULL
, 0);
475 * We put the ResultRelInfo in the es_opened_result_relations list, even
476 * though we don't populate the es_result_relations array. That's a bit
477 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
479 * ExecOpenIndices() is not called here either, each execution path doing
480 * an apply operation being responsible for that.
482 estate
->es_opened_result_relations
=
483 lappend(estate
->es_opened_result_relations
, resultRelInfo
);
485 estate
->es_output_cid
= GetCurrentCommandId(true);
487 /* Prepare to catch AFTER triggers. */
488 AfterTriggerBeginQuery();
490 /* other fields of edata remain NULL for now */
496 * Finish any operations related to the executor state created by
497 * create_edata_for_relation().
500 finish_edata(ApplyExecutionData
*edata
)
502 EState
*estate
= edata
->estate
;
504 /* Handle any queued AFTER triggers. */
505 AfterTriggerEndQuery(estate
);
507 /* Shut down tuple routing, if any was done. */
509 ExecCleanupTupleRouting(edata
->mtstate
, edata
->proute
);
512 * Cleanup. It might seem that we should call ExecCloseResultRelations()
513 * here, but we intentionally don't. It would close the rel we added to
514 * es_opened_result_relations above, which is wrong because we took no
515 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
516 * any other relations opened during execution.
518 ExecResetTupleTable(estate
->es_tupleTable
, false);
519 FreeExecutorState(estate
);
524 * Executes default values for columns for which we can't map to remote
527 * This allows us to support tables which have more columns on the downstream
528 * than on the upstream.
531 slot_fill_defaults(LogicalRepRelMapEntry
*rel
, EState
*estate
,
532 TupleTableSlot
*slot
)
534 TupleDesc desc
= RelationGetDescr(rel
->localrel
);
535 int num_phys_attrs
= desc
->natts
;
540 ExprState
**defexprs
;
541 ExprContext
*econtext
;
543 econtext
= GetPerTupleExprContext(estate
);
545 /* We got all the data via replication, no need to evaluate anything. */
546 if (num_phys_attrs
== rel
->remoterel
.natts
)
549 defmap
= (int *) palloc(num_phys_attrs
* sizeof(int));
550 defexprs
= (ExprState
**) palloc(num_phys_attrs
* sizeof(ExprState
*));
552 Assert(rel
->attrmap
->maplen
== num_phys_attrs
);
553 for (attnum
= 0; attnum
< num_phys_attrs
; attnum
++)
557 if (TupleDescAttr(desc
, attnum
)->attisdropped
|| TupleDescAttr(desc
, attnum
)->attgenerated
)
560 if (rel
->attrmap
->attnums
[attnum
] >= 0)
563 defexpr
= (Expr
*) build_column_default(rel
->localrel
, attnum
+ 1);
567 /* Run the expression through planner */
568 defexpr
= expression_planner(defexpr
);
570 /* Initialize executable expression in copycontext */
571 defexprs
[num_defaults
] = ExecInitExpr(defexpr
, NULL
);
572 defmap
[num_defaults
] = attnum
;
578 for (i
= 0; i
< num_defaults
; i
++)
579 slot
->tts_values
[defmap
[i
]] =
580 ExecEvalExpr(defexprs
[i
], econtext
, &slot
->tts_isnull
[defmap
[i
]]);
584 * Store tuple data into slot.
586 * Incoming data can be either text or binary format.
589 slot_store_data(TupleTableSlot
*slot
, LogicalRepRelMapEntry
*rel
,
590 LogicalRepTupleData
*tupleData
)
592 int natts
= slot
->tts_tupleDescriptor
->natts
;
595 ExecClearTuple(slot
);
597 /* Call the "in" function for each non-dropped, non-null attribute */
598 Assert(natts
== rel
->attrmap
->maplen
);
599 for (i
= 0; i
< natts
; i
++)
601 Form_pg_attribute att
= TupleDescAttr(slot
->tts_tupleDescriptor
, i
);
602 int remoteattnum
= rel
->attrmap
->attnums
[i
];
604 if (!att
->attisdropped
&& remoteattnum
>= 0)
606 StringInfo colvalue
= &tupleData
->colvalues
[remoteattnum
];
608 Assert(remoteattnum
< tupleData
->ncols
);
610 /* Set attnum for error callback */
611 apply_error_callback_arg
.remote_attnum
= remoteattnum
;
613 if (tupleData
->colstatus
[remoteattnum
] == LOGICALREP_COLUMN_TEXT
)
618 getTypeInputInfo(att
->atttypid
, &typinput
, &typioparam
);
619 slot
->tts_values
[i
] =
620 OidInputFunctionCall(typinput
, colvalue
->data
,
621 typioparam
, att
->atttypmod
);
622 slot
->tts_isnull
[i
] = false;
624 else if (tupleData
->colstatus
[remoteattnum
] == LOGICALREP_COLUMN_BINARY
)
630 * In some code paths we may be asked to re-parse the same
631 * tuple data. Reset the StringInfo's cursor so that works.
633 colvalue
->cursor
= 0;
635 getTypeBinaryInputInfo(att
->atttypid
, &typreceive
, &typioparam
);
636 slot
->tts_values
[i
] =
637 OidReceiveFunctionCall(typreceive
, colvalue
,
638 typioparam
, att
->atttypmod
);
640 /* Trouble if it didn't eat the whole buffer */
641 if (colvalue
->cursor
!= colvalue
->len
)
643 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION
),
644 errmsg("incorrect binary data format in logical replication column %d",
646 slot
->tts_isnull
[i
] = false;
651 * NULL value from remote. (We don't expect to see
652 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
655 slot
->tts_values
[i
] = (Datum
) 0;
656 slot
->tts_isnull
[i
] = true;
659 /* Reset attnum for error callback */
660 apply_error_callback_arg
.remote_attnum
= -1;
665 * We assign NULL to dropped attributes and missing values
666 * (missing values should be later filled using
667 * slot_fill_defaults).
669 slot
->tts_values
[i
] = (Datum
) 0;
670 slot
->tts_isnull
[i
] = true;
674 ExecStoreVirtualTuple(slot
);
678 * Replace updated columns with data from the LogicalRepTupleData struct.
679 * This is somewhat similar to heap_modify_tuple but also calls the type
680 * input functions on the user data.
682 * "slot" is filled with a copy of the tuple in "srcslot", replacing
683 * columns provided in "tupleData" and leaving others as-is.
685 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
686 * storage for "srcslot". This is OK for current usage, but someday we may
687 * need to materialize "slot" at the end to make it independent of "srcslot".
690 slot_modify_data(TupleTableSlot
*slot
, TupleTableSlot
*srcslot
,
691 LogicalRepRelMapEntry
*rel
,
692 LogicalRepTupleData
*tupleData
)
694 int natts
= slot
->tts_tupleDescriptor
->natts
;
697 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
698 ExecClearTuple(slot
);
701 * Copy all the column data from srcslot, so that we'll have valid values
702 * for unreplaced columns.
704 Assert(natts
== srcslot
->tts_tupleDescriptor
->natts
);
705 slot_getallattrs(srcslot
);
706 memcpy(slot
->tts_values
, srcslot
->tts_values
, natts
* sizeof(Datum
));
707 memcpy(slot
->tts_isnull
, srcslot
->tts_isnull
, natts
* sizeof(bool));
709 /* Call the "in" function for each replaced attribute */
710 Assert(natts
== rel
->attrmap
->maplen
);
711 for (i
= 0; i
< natts
; i
++)
713 Form_pg_attribute att
= TupleDescAttr(slot
->tts_tupleDescriptor
, i
);
714 int remoteattnum
= rel
->attrmap
->attnums
[i
];
716 if (remoteattnum
< 0)
719 Assert(remoteattnum
< tupleData
->ncols
);
721 if (tupleData
->colstatus
[remoteattnum
] != LOGICALREP_COLUMN_UNCHANGED
)
723 StringInfo colvalue
= &tupleData
->colvalues
[remoteattnum
];
725 /* Set attnum for error callback */
726 apply_error_callback_arg
.remote_attnum
= remoteattnum
;
728 if (tupleData
->colstatus
[remoteattnum
] == LOGICALREP_COLUMN_TEXT
)
733 getTypeInputInfo(att
->atttypid
, &typinput
, &typioparam
);
734 slot
->tts_values
[i
] =
735 OidInputFunctionCall(typinput
, colvalue
->data
,
736 typioparam
, att
->atttypmod
);
737 slot
->tts_isnull
[i
] = false;
739 else if (tupleData
->colstatus
[remoteattnum
] == LOGICALREP_COLUMN_BINARY
)
745 * In some code paths we may be asked to re-parse the same
746 * tuple data. Reset the StringInfo's cursor so that works.
748 colvalue
->cursor
= 0;
750 getTypeBinaryInputInfo(att
->atttypid
, &typreceive
, &typioparam
);
751 slot
->tts_values
[i
] =
752 OidReceiveFunctionCall(typreceive
, colvalue
,
753 typioparam
, att
->atttypmod
);
755 /* Trouble if it didn't eat the whole buffer */
756 if (colvalue
->cursor
!= colvalue
->len
)
758 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION
),
759 errmsg("incorrect binary data format in logical replication column %d",
761 slot
->tts_isnull
[i
] = false;
765 /* must be LOGICALREP_COLUMN_NULL */
766 slot
->tts_values
[i
] = (Datum
) 0;
767 slot
->tts_isnull
[i
] = true;
770 /* Reset attnum for error callback */
771 apply_error_callback_arg
.remote_attnum
= -1;
775 /* And finally, declare that "slot" contains a valid virtual tuple */
776 ExecStoreVirtualTuple(slot
);
780 * Handle BEGIN message.
783 apply_handle_begin(StringInfo s
)
785 LogicalRepBeginData begin_data
;
787 logicalrep_read_begin(s
, &begin_data
);
788 set_apply_error_context_xact(begin_data
.xid
, begin_data
.committime
);
790 remote_final_lsn
= begin_data
.final_lsn
;
792 in_remote_transaction
= true;
794 pgstat_report_activity(STATE_RUNNING
, NULL
);
798 * Handle COMMIT message.
800 * TODO, support tracking of multiple origins
803 apply_handle_commit(StringInfo s
)
805 LogicalRepCommitData commit_data
;
807 logicalrep_read_commit(s
, &commit_data
);
809 if (commit_data
.commit_lsn
!= remote_final_lsn
)
811 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
812 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
813 LSN_FORMAT_ARGS(commit_data
.commit_lsn
),
814 LSN_FORMAT_ARGS(remote_final_lsn
))));
816 apply_handle_commit_internal(&commit_data
);
818 /* Process any tables that are being synchronized in parallel. */
819 process_syncing_tables(commit_data
.end_lsn
);
821 pgstat_report_activity(STATE_IDLE
, NULL
);
822 reset_apply_error_context_info();
826 * Handle BEGIN PREPARE message.
829 apply_handle_begin_prepare(StringInfo s
)
831 LogicalRepPreparedTxnData begin_data
;
833 /* Tablesync should never receive prepare. */
834 if (am_tablesync_worker())
836 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
837 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
839 logicalrep_read_begin_prepare(s
, &begin_data
);
840 set_apply_error_context_xact(begin_data
.xid
, begin_data
.prepare_time
);
842 remote_final_lsn
= begin_data
.prepare_lsn
;
844 in_remote_transaction
= true;
846 pgstat_report_activity(STATE_RUNNING
, NULL
);
850 * Common function to prepare the GID.
853 apply_handle_prepare_internal(LogicalRepPreparedTxnData
*prepare_data
)
858 * Compute unique GID for two_phase transactions. We don't use GID of
859 * prepared transaction sent by server as that can lead to deadlock when
860 * we have multiple subscriptions from same node point to publications on
861 * the same node. See comments atop worker.c
863 TwoPhaseTransactionGid(MySubscription
->oid
, prepare_data
->xid
,
867 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
868 * called within the PrepareTransactionBlock below.
870 BeginTransactionBlock();
871 CommitTransactionCommand(); /* Completes the preceding Begin command. */
874 * Update origin state so we can restart streaming from correct position
877 replorigin_session_origin_lsn
= prepare_data
->end_lsn
;
878 replorigin_session_origin_timestamp
= prepare_data
->prepare_time
;
880 PrepareTransactionBlock(gid
);
884 * Handle PREPARE message.
887 apply_handle_prepare(StringInfo s
)
889 LogicalRepPreparedTxnData prepare_data
;
891 logicalrep_read_prepare(s
, &prepare_data
);
893 if (prepare_data
.prepare_lsn
!= remote_final_lsn
)
895 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
896 errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
897 LSN_FORMAT_ARGS(prepare_data
.prepare_lsn
),
898 LSN_FORMAT_ARGS(remote_final_lsn
))));
901 * Unlike commit, here, we always prepare the transaction even though no
902 * change has happened in this transaction. It is done this way because at
903 * commit prepared time, we won't know whether we have skipped preparing a
904 * transaction because of no change.
906 * XXX, We can optimize such that at commit prepared time, we first check
907 * whether we have prepared the transaction or not but that doesn't seem
908 * worthwhile because such cases shouldn't be common.
910 begin_replication_step();
912 apply_handle_prepare_internal(&prepare_data
);
914 end_replication_step();
915 CommitTransactionCommand();
916 pgstat_report_stat(false);
918 store_flush_position(prepare_data
.end_lsn
);
920 in_remote_transaction
= false;
922 /* Process any tables that are being synchronized in parallel. */
923 process_syncing_tables(prepare_data
.end_lsn
);
925 pgstat_report_activity(STATE_IDLE
, NULL
);
926 reset_apply_error_context_info();
930 * Handle a COMMIT PREPARED of a previously PREPARED transaction.
933 apply_handle_commit_prepared(StringInfo s
)
935 LogicalRepCommitPreparedTxnData prepare_data
;
938 logicalrep_read_commit_prepared(s
, &prepare_data
);
939 set_apply_error_context_xact(prepare_data
.xid
, prepare_data
.commit_time
);
941 /* Compute GID for two_phase transactions. */
942 TwoPhaseTransactionGid(MySubscription
->oid
, prepare_data
.xid
,
945 /* There is no transaction when COMMIT PREPARED is called */
946 begin_replication_step();
949 * Update origin state so we can restart streaming from correct position
952 replorigin_session_origin_lsn
= prepare_data
.end_lsn
;
953 replorigin_session_origin_timestamp
= prepare_data
.commit_time
;
955 FinishPreparedTransaction(gid
, true);
956 end_replication_step();
957 CommitTransactionCommand();
958 pgstat_report_stat(false);
960 store_flush_position(prepare_data
.end_lsn
);
961 in_remote_transaction
= false;
963 /* Process any tables that are being synchronized in parallel. */
964 process_syncing_tables(prepare_data
.end_lsn
);
966 pgstat_report_activity(STATE_IDLE
, NULL
);
967 reset_apply_error_context_info();
971 * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
974 apply_handle_rollback_prepared(StringInfo s
)
976 LogicalRepRollbackPreparedTxnData rollback_data
;
979 logicalrep_read_rollback_prepared(s
, &rollback_data
);
980 set_apply_error_context_xact(rollback_data
.xid
, rollback_data
.rollback_time
);
982 /* Compute GID for two_phase transactions. */
983 TwoPhaseTransactionGid(MySubscription
->oid
, rollback_data
.xid
,
987 * It is possible that we haven't received prepare because it occurred
988 * before walsender reached a consistent point or the two_phase was still
989 * not enabled by that time, so in such cases, we need to skip rollback
992 if (LookupGXact(gid
, rollback_data
.prepare_end_lsn
,
993 rollback_data
.prepare_time
))
996 * Update origin state so we can restart streaming from correct
997 * position in case of crash.
999 replorigin_session_origin_lsn
= rollback_data
.rollback_end_lsn
;
1000 replorigin_session_origin_timestamp
= rollback_data
.rollback_time
;
1002 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1003 begin_replication_step();
1004 FinishPreparedTransaction(gid
, false);
1005 end_replication_step();
1006 CommitTransactionCommand();
1009 pgstat_report_stat(false);
1011 store_flush_position(rollback_data
.rollback_end_lsn
);
1012 in_remote_transaction
= false;
1014 /* Process any tables that are being synchronized in parallel. */
1015 process_syncing_tables(rollback_data
.rollback_end_lsn
);
1017 pgstat_report_activity(STATE_IDLE
, NULL
);
1018 reset_apply_error_context_info();
1022 * Handle STREAM PREPARE.
1024 * Logic is in two parts:
1025 * 1. Replay all the spooled operations
1026 * 2. Mark the transaction as prepared
1029 apply_handle_stream_prepare(StringInfo s
)
1031 LogicalRepPreparedTxnData prepare_data
;
1033 if (in_streamed_transaction
)
1035 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
1036 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1038 /* Tablesync should never receive prepare. */
1039 if (am_tablesync_worker())
1041 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
1042 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1044 logicalrep_read_stream_prepare(s
, &prepare_data
);
1045 set_apply_error_context_xact(prepare_data
.xid
, prepare_data
.prepare_time
);
1047 elog(DEBUG1
, "received prepare for streamed transaction %u", prepare_data
.xid
);
1049 /* Replay all the spooled operations. */
1050 apply_spooled_messages(prepare_data
.xid
, prepare_data
.prepare_lsn
);
1052 /* Mark the transaction as prepared. */
1053 apply_handle_prepare_internal(&prepare_data
);
1055 CommitTransactionCommand();
1057 pgstat_report_stat(false);
1059 store_flush_position(prepare_data
.end_lsn
);
1061 in_remote_transaction
= false;
1063 /* unlink the files with serialized changes and subxact info. */
1064 stream_cleanup_files(MyLogicalRepWorker
->subid
, prepare_data
.xid
);
1066 /* Process any tables that are being synchronized in parallel. */
1067 process_syncing_tables(prepare_data
.end_lsn
);
1069 pgstat_report_activity(STATE_IDLE
, NULL
);
1071 reset_apply_error_context_info();
1075 * Handle ORIGIN message.
1077 * TODO, support tracking of multiple origins
1080 apply_handle_origin(StringInfo s
)
1083 * ORIGIN message can only come inside streaming transaction or inside
1084 * remote transaction and before any actual writes.
1086 if (!in_streamed_transaction
&&
1087 (!in_remote_transaction
||
1088 (IsTransactionState() && !am_tablesync_worker())))
1090 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
1091 errmsg_internal("ORIGIN message sent out of order")));
1095 * Handle STREAM START message.
1098 apply_handle_stream_start(StringInfo s
)
1102 if (in_streamed_transaction
)
1104 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
1105 errmsg_internal("duplicate STREAM START message")));
1108 * Start a transaction on stream start, this transaction will be committed
1109 * on the stream stop unless it is a tablesync worker in which case it
1110 * will be committed after processing all the messages. We need the
1111 * transaction for handling the buffile, used for serializing the
1112 * streaming data and subxact info.
1114 begin_replication_step();
1116 /* notify handle methods we're processing a remote transaction */
1117 in_streamed_transaction
= true;
1119 /* extract XID of the top-level transaction */
1120 stream_xid
= logicalrep_read_stream_start(s
, &first_segment
);
1122 if (!TransactionIdIsValid(stream_xid
))
1124 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
1125 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1127 set_apply_error_context_xact(stream_xid
, 0);
1130 * Initialize the worker's stream_fileset if we haven't yet. This will be
1131 * used for the entire duration of the worker so create it in a permanent
1132 * context. We create this on the very first streaming message from any
1133 * transaction and then use it for this and other streaming transactions.
1134 * Now, we could create a fileset at the start of the worker as well but
1135 * then we won't be sure that it will ever be used.
1137 if (MyLogicalRepWorker
->stream_fileset
== NULL
)
1139 MemoryContext oldctx
;
1141 oldctx
= MemoryContextSwitchTo(ApplyContext
);
1143 MyLogicalRepWorker
->stream_fileset
= palloc(sizeof(FileSet
));
1144 FileSetInit(MyLogicalRepWorker
->stream_fileset
);
1146 MemoryContextSwitchTo(oldctx
);
1149 /* open the spool file for this transaction */
1150 stream_open_file(MyLogicalRepWorker
->subid
, stream_xid
, first_segment
);
1152 /* if this is not the first segment, open existing subxact file */
1154 subxact_info_read(MyLogicalRepWorker
->subid
, stream_xid
);
1156 pgstat_report_activity(STATE_RUNNING
, NULL
);
1158 end_replication_step();
1162 * Handle STREAM STOP message.
1165 apply_handle_stream_stop(StringInfo s
)
1167 if (!in_streamed_transaction
)
1169 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
1170 errmsg_internal("STREAM STOP message without STREAM START")));
1173 * Close the file with serialized changes, and serialize information about
1174 * subxacts for the toplevel transaction.
1176 subxact_info_write(MyLogicalRepWorker
->subid
, stream_xid
);
1177 stream_close_file();
1179 /* We must be in a valid transaction state */
1180 Assert(IsTransactionState());
1182 /* Commit the per-stream transaction */
1183 CommitTransactionCommand();
1185 in_streamed_transaction
= false;
1187 /* Reset per-stream context */
1188 MemoryContextReset(LogicalStreamingContext
);
1190 pgstat_report_activity(STATE_IDLE
, NULL
);
1191 reset_apply_error_context_info();
1195 * Handle STREAM abort message.
1198 apply_handle_stream_abort(StringInfo s
)
1201 TransactionId subxid
;
1203 if (in_streamed_transaction
)
1205 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
1206 errmsg_internal("STREAM ABORT message without STREAM STOP")));
1208 logicalrep_read_stream_abort(s
, &xid
, &subxid
);
1211 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1212 * just delete the files with serialized info.
1216 set_apply_error_context_xact(xid
, 0);
1217 stream_cleanup_files(MyLogicalRepWorker
->subid
, xid
);
1222 * OK, so it's a subxact. We need to read the subxact file for the
1223 * toplevel transaction, determine the offset tracked for the subxact,
1224 * and truncate the file with changes. We also remove the subxacts
1225 * with higher offsets (or rather higher XIDs).
1227 * We intentionally scan the array from the tail, because we're likely
1228 * aborting a change for the most recent subtransactions.
1230 * We can't use the binary search here as subxact XIDs won't
1231 * necessarily arrive in sorted order, consider the case where we have
1232 * released the savepoint for multiple subtransactions and then
1233 * performed rollback to savepoint for one of the earlier
1240 char path
[MAXPGPATH
];
1242 set_apply_error_context_xact(subxid
, 0);
1245 begin_replication_step();
1246 subxact_info_read(MyLogicalRepWorker
->subid
, xid
);
1248 for (i
= subxact_data
.nsubxacts
; i
> 0; i
--)
1250 if (subxact_data
.subxacts
[i
- 1].xid
== subxid
)
1259 * If it's an empty sub-transaction then we will not find the subxid
1260 * here so just cleanup the subxact info and return.
1264 /* Cleanup the subxact info */
1265 cleanup_subxact_info();
1266 end_replication_step();
1267 CommitTransactionCommand();
1268 reset_apply_error_context_info();
1272 /* open the changes file */
1273 changes_filename(path
, MyLogicalRepWorker
->subid
, xid
);
1274 fd
= BufFileOpenFileSet(MyLogicalRepWorker
->stream_fileset
, path
,
1277 /* OK, truncate the file at the right offset */
1278 BufFileTruncateFileSet(fd
, subxact_data
.subxacts
[subidx
].fileno
,
1279 subxact_data
.subxacts
[subidx
].offset
);
1282 /* discard the subxacts added later */
1283 subxact_data
.nsubxacts
= subidx
;
1285 /* write the updated subxact list */
1286 subxact_info_write(MyLogicalRepWorker
->subid
, xid
);
1288 end_replication_step();
1289 CommitTransactionCommand();
1292 reset_apply_error_context_info();
1296 * Common spoolfile processing.
1299 apply_spooled_messages(TransactionId xid
, XLogRecPtr lsn
)
1303 char path
[MAXPGPATH
];
1304 char *buffer
= NULL
;
1305 MemoryContext oldcxt
;
1308 /* Make sure we have an open transaction */
1309 begin_replication_step();
1312 * Allocate file handle and memory required to process all the messages in
1313 * TopTransactionContext to avoid them getting reset after each message is
1316 oldcxt
= MemoryContextSwitchTo(TopTransactionContext
);
1318 /* Open the spool file for the committed/prepared transaction */
1319 changes_filename(path
, MyLogicalRepWorker
->subid
, xid
);
1320 elog(DEBUG1
, "replaying changes from file \"%s\"", path
);
1322 fd
= BufFileOpenFileSet(MyLogicalRepWorker
->stream_fileset
, path
, O_RDONLY
,
1325 buffer
= palloc(BLCKSZ
);
1326 initStringInfo(&s2
);
1328 MemoryContextSwitchTo(oldcxt
);
1330 remote_final_lsn
= lsn
;
1333 * Make sure the handle apply_dispatch methods are aware we're in a remote
1336 in_remote_transaction
= true;
1337 pgstat_report_activity(STATE_RUNNING
, NULL
);
1339 end_replication_step();
1342 * Read the entries one by one and pass them through the same logic as in
1351 CHECK_FOR_INTERRUPTS();
1353 /* read length of the on-disk record */
1354 nbytes
= BufFileRead(fd
, &len
, sizeof(len
));
1356 /* have we reached end of the file? */
1360 /* do we have a correct length? */
1361 if (nbytes
!= sizeof(len
))
1363 (errcode_for_file_access(),
1364 errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1368 elog(ERROR
, "incorrect length %d in streaming transaction's changes file \"%s\"",
1371 /* make sure we have sufficiently large buffer */
1372 buffer
= repalloc(buffer
, len
);
1374 /* and finally read the data into the buffer */
1375 if (BufFileRead(fd
, buffer
, len
) != len
)
1377 (errcode_for_file_access(),
1378 errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1381 /* copy the buffer to the stringinfo and call apply_dispatch */
1382 resetStringInfo(&s2
);
1383 appendBinaryStringInfo(&s2
, buffer
, len
);
1385 /* Ensure we are reading the data into our memory context. */
1386 oldcxt
= MemoryContextSwitchTo(ApplyMessageContext
);
1388 apply_dispatch(&s2
);
1390 MemoryContextReset(ApplyMessageContext
);
1392 MemoryContextSwitchTo(oldcxt
);
1396 if (nchanges
% 1000 == 0)
1397 elog(DEBUG1
, "replayed %d changes from file \"%s\"",
1406 elog(DEBUG1
, "replayed %d (all) changes from file \"%s\"",
1413 * Handle STREAM COMMIT message.
1416 apply_handle_stream_commit(StringInfo s
)
1419 LogicalRepCommitData commit_data
;
1421 if (in_streamed_transaction
)
1423 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
1424 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1426 xid
= logicalrep_read_stream_commit(s
, &commit_data
);
1427 set_apply_error_context_xact(xid
, commit_data
.committime
);
1429 elog(DEBUG1
, "received commit for streamed transaction %u", xid
);
1431 apply_spooled_messages(xid
, commit_data
.commit_lsn
);
1433 apply_handle_commit_internal(&commit_data
);
1435 /* unlink the files with serialized changes and subxact info */
1436 stream_cleanup_files(MyLogicalRepWorker
->subid
, xid
);
1438 /* Process any tables that are being synchronized in parallel. */
1439 process_syncing_tables(commit_data
.end_lsn
);
1441 pgstat_report_activity(STATE_IDLE
, NULL
);
1443 reset_apply_error_context_info();
1447 * Helper function for apply_handle_commit and apply_handle_stream_commit.
1450 apply_handle_commit_internal(LogicalRepCommitData
*commit_data
)
1452 if (IsTransactionState())
1455 * Update origin state so we can restart streaming from correct
1456 * position in case of crash.
1458 replorigin_session_origin_lsn
= commit_data
->end_lsn
;
1459 replorigin_session_origin_timestamp
= commit_data
->committime
;
1461 CommitTransactionCommand();
1462 pgstat_report_stat(false);
1464 store_flush_position(commit_data
->end_lsn
);
1468 /* Process any invalidation messages that might have accumulated. */
1469 AcceptInvalidationMessages();
1470 maybe_reread_subscription();
1473 in_remote_transaction
= false;
1477 * Handle RELATION message.
1479 * Note we don't do validation against local schema here. The validation
1480 * against local schema is postponed until first change for given relation
1481 * comes as we only care about it when applying changes for it anyway and we
1482 * do less locking this way.
1485 apply_handle_relation(StringInfo s
)
1487 LogicalRepRelation
*rel
;
1489 if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION
, s
))
1492 rel
= logicalrep_read_rel(s
);
1493 logicalrep_relmap_update(rel
);
1497 * Handle TYPE message.
1499 * This implementation pays no attention to TYPE messages; we expect the user
1500 * to have set things up so that the incoming data is acceptable to the input
1501 * functions for the locally subscribed tables. Hence, we just read and
1502 * discard the message.
1505 apply_handle_type(StringInfo s
)
1509 if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE
, s
))
1512 logicalrep_read_typ(s
, &typ
);
1516 * Get replica identity index or if it is not defined a primary key.
1518 * If neither is defined, returns InvalidOid
1521 GetRelationIdentityOrPK(Relation rel
)
1525 idxoid
= RelationGetReplicaIndex(rel
);
1527 if (!OidIsValid(idxoid
))
1528 idxoid
= RelationGetPrimaryKeyIndex(rel
);
1534 * Handle INSERT message.
1538 apply_handle_insert(StringInfo s
)
1540 LogicalRepRelMapEntry
*rel
;
1541 LogicalRepTupleData newtup
;
1542 LogicalRepRelId relid
;
1543 ApplyExecutionData
*edata
;
1545 TupleTableSlot
*remoteslot
;
1546 MemoryContext oldctx
;
1548 if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT
, s
))
1551 begin_replication_step();
1553 relid
= logicalrep_read_insert(s
, &newtup
);
1554 rel
= logicalrep_rel_open(relid
, RowExclusiveLock
);
1555 if (!should_apply_changes_for_rel(rel
))
1558 * The relation can't become interesting in the middle of the
1559 * transaction so it's safe to unlock it.
1561 logicalrep_rel_close(rel
, RowExclusiveLock
);
1562 end_replication_step();
1566 /* Set relation for error callback */
1567 apply_error_callback_arg
.rel
= rel
;
1569 /* Initialize the executor state. */
1570 edata
= create_edata_for_relation(rel
);
1571 estate
= edata
->estate
;
1572 remoteslot
= ExecInitExtraTupleSlot(estate
,
1573 RelationGetDescr(rel
->localrel
),
1576 /* Process and store remote tuple in the slot */
1577 oldctx
= MemoryContextSwitchTo(GetPerTupleMemoryContext(estate
));
1578 slot_store_data(remoteslot
, rel
, &newtup
);
1579 slot_fill_defaults(rel
, estate
, remoteslot
);
1580 MemoryContextSwitchTo(oldctx
);
1582 /* For a partitioned table, insert the tuple into a partition. */
1583 if (rel
->localrel
->rd_rel
->relkind
== RELKIND_PARTITIONED_TABLE
)
1584 apply_handle_tuple_routing(edata
,
1585 remoteslot
, NULL
, CMD_INSERT
);
1587 apply_handle_insert_internal(edata
, edata
->targetRelInfo
,
1590 finish_edata(edata
);
1592 /* Reset relation for error callback */
1593 apply_error_callback_arg
.rel
= NULL
;
1595 logicalrep_rel_close(rel
, NoLock
);
1597 end_replication_step();
1601 * Workhorse for apply_handle_insert()
1602 * relinfo is for the relation we're actually inserting into
1603 * (could be a child partition of edata->targetRelInfo)
1606 apply_handle_insert_internal(ApplyExecutionData
*edata
,
1607 ResultRelInfo
*relinfo
,
1608 TupleTableSlot
*remoteslot
)
1610 EState
*estate
= edata
->estate
;
1612 /* We must open indexes here. */
1613 ExecOpenIndices(relinfo
, false);
1615 /* Do the insert. */
1616 ExecSimpleRelationInsert(relinfo
, estate
, remoteslot
);
1619 ExecCloseIndices(relinfo
);
1623 * Check if the logical replication relation is updatable and throw
1624 * appropriate error if it isn't.
1627 check_relation_updatable(LogicalRepRelMapEntry
*rel
)
1629 /* Updatable, no error. */
1634 * We are in error mode so it's fine this is somewhat slow. It's better to
1635 * give user correct error.
1637 if (OidIsValid(GetRelationIdentityOrPK(rel
->localrel
)))
1640 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1641 errmsg("publisher did not send replica identity column "
1642 "expected by the logical replication target relation \"%s.%s\"",
1643 rel
->remoterel
.nspname
, rel
->remoterel
.relname
)));
1647 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1648 errmsg("logical replication target relation \"%s.%s\" has "
1649 "neither REPLICA IDENTITY index nor PRIMARY "
1650 "KEY and published relation does not have "
1651 "REPLICA IDENTITY FULL",
1652 rel
->remoterel
.nspname
, rel
->remoterel
.relname
)));
1656 * Handle UPDATE message.
1661 apply_handle_update(StringInfo s
)
1663 LogicalRepRelMapEntry
*rel
;
1664 LogicalRepRelId relid
;
1665 ApplyExecutionData
*edata
;
1667 LogicalRepTupleData oldtup
;
1668 LogicalRepTupleData newtup
;
1670 TupleTableSlot
*remoteslot
;
1671 RangeTblEntry
*target_rte
;
1672 MemoryContext oldctx
;
1674 if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE
, s
))
1677 begin_replication_step();
1679 relid
= logicalrep_read_update(s
, &has_oldtup
, &oldtup
,
1681 rel
= logicalrep_rel_open(relid
, RowExclusiveLock
);
1682 if (!should_apply_changes_for_rel(rel
))
1685 * The relation can't become interesting in the middle of the
1686 * transaction so it's safe to unlock it.
1688 logicalrep_rel_close(rel
, RowExclusiveLock
);
1689 end_replication_step();
1693 /* Set relation for error callback */
1694 apply_error_callback_arg
.rel
= rel
;
1696 /* Check if we can do the update. */
1697 check_relation_updatable(rel
);
1699 /* Initialize the executor state. */
1700 edata
= create_edata_for_relation(rel
);
1701 estate
= edata
->estate
;
1702 remoteslot
= ExecInitExtraTupleSlot(estate
,
1703 RelationGetDescr(rel
->localrel
),
1707 * Populate updatedCols so that per-column triggers can fire, and so
1708 * executor can correctly pass down indexUnchanged hint. This could
1709 * include more columns than were actually changed on the publisher
1710 * because the logical replication protocol doesn't contain that
1711 * information. But it would for example exclude columns that only exist
1712 * on the subscriber, since we are not touching those.
1714 target_rte
= list_nth(estate
->es_range_table
, 0);
1715 for (int i
= 0; i
< remoteslot
->tts_tupleDescriptor
->natts
; i
++)
1717 Form_pg_attribute att
= TupleDescAttr(remoteslot
->tts_tupleDescriptor
, i
);
1718 int remoteattnum
= rel
->attrmap
->attnums
[i
];
1720 if (!att
->attisdropped
&& remoteattnum
>= 0)
1722 Assert(remoteattnum
< newtup
.ncols
);
1723 if (newtup
.colstatus
[remoteattnum
] != LOGICALREP_COLUMN_UNCHANGED
)
1724 target_rte
->updatedCols
=
1725 bms_add_member(target_rte
->updatedCols
,
1726 i
+ 1 - FirstLowInvalidHeapAttributeNumber
);
1730 /* Also populate extraUpdatedCols, in case we have generated columns */
1731 fill_extraUpdatedCols(target_rte
, rel
->localrel
);
1733 /* Build the search tuple. */
1734 oldctx
= MemoryContextSwitchTo(GetPerTupleMemoryContext(estate
));
1735 slot_store_data(remoteslot
, rel
,
1736 has_oldtup
? &oldtup
: &newtup
);
1737 MemoryContextSwitchTo(oldctx
);
1739 /* For a partitioned table, apply update to correct partition. */
1740 if (rel
->localrel
->rd_rel
->relkind
== RELKIND_PARTITIONED_TABLE
)
1741 apply_handle_tuple_routing(edata
,
1742 remoteslot
, &newtup
, CMD_UPDATE
);
1744 apply_handle_update_internal(edata
, edata
->targetRelInfo
,
1745 remoteslot
, &newtup
);
1747 finish_edata(edata
);
1749 /* Reset relation for error callback */
1750 apply_error_callback_arg
.rel
= NULL
;
1752 logicalrep_rel_close(rel
, NoLock
);
1754 end_replication_step();
1758 * Workhorse for apply_handle_update()
1759 * relinfo is for the relation we're actually updating in
1760 * (could be a child partition of edata->targetRelInfo)
1763 apply_handle_update_internal(ApplyExecutionData
*edata
,
1764 ResultRelInfo
*relinfo
,
1765 TupleTableSlot
*remoteslot
,
1766 LogicalRepTupleData
*newtup
)
1768 EState
*estate
= edata
->estate
;
1769 LogicalRepRelMapEntry
*relmapentry
= edata
->targetRel
;
1770 Relation localrel
= relinfo
->ri_RelationDesc
;
1772 TupleTableSlot
*localslot
;
1774 MemoryContext oldctx
;
1776 EvalPlanQualInit(&epqstate
, estate
, NULL
, NIL
, -1);
1777 ExecOpenIndices(relinfo
, false);
1779 found
= FindReplTupleInLocalRel(estate
, localrel
,
1780 &relmapentry
->remoterel
,
1781 remoteslot
, &localslot
);
1782 ExecClearTuple(remoteslot
);
1787 * Note this will fail if there are other conflicting unique indexes.
1791 /* Process and store remote tuple in the slot */
1792 oldctx
= MemoryContextSwitchTo(GetPerTupleMemoryContext(estate
));
1793 slot_modify_data(remoteslot
, localslot
, relmapentry
, newtup
);
1794 MemoryContextSwitchTo(oldctx
);
1796 EvalPlanQualSetSlot(&epqstate
, remoteslot
);
1798 /* Do the actual update. */
1799 ExecSimpleRelationUpdate(relinfo
, estate
, &epqstate
, localslot
,
1805 * The tuple to be updated could not be found. Do nothing except for
1806 * emitting a log message.
1808 * XXX should this be promoted to ereport(LOG) perhaps?
1811 "logical replication did not find row to be updated "
1812 "in replication target relation \"%s\"",
1813 RelationGetRelationName(localrel
));
1817 ExecCloseIndices(relinfo
);
1818 EvalPlanQualEnd(&epqstate
);
1822 * Handle DELETE message.
1827 apply_handle_delete(StringInfo s
)
1829 LogicalRepRelMapEntry
*rel
;
1830 LogicalRepTupleData oldtup
;
1831 LogicalRepRelId relid
;
1832 ApplyExecutionData
*edata
;
1834 TupleTableSlot
*remoteslot
;
1835 MemoryContext oldctx
;
1837 if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE
, s
))
1840 begin_replication_step();
1842 relid
= logicalrep_read_delete(s
, &oldtup
);
1843 rel
= logicalrep_rel_open(relid
, RowExclusiveLock
);
1844 if (!should_apply_changes_for_rel(rel
))
1847 * The relation can't become interesting in the middle of the
1848 * transaction so it's safe to unlock it.
1850 logicalrep_rel_close(rel
, RowExclusiveLock
);
1851 end_replication_step();
1855 /* Set relation for error callback */
1856 apply_error_callback_arg
.rel
= rel
;
1858 /* Check if we can do the delete. */
1859 check_relation_updatable(rel
);
1861 /* Initialize the executor state. */
1862 edata
= create_edata_for_relation(rel
);
1863 estate
= edata
->estate
;
1864 remoteslot
= ExecInitExtraTupleSlot(estate
,
1865 RelationGetDescr(rel
->localrel
),
1868 /* Build the search tuple. */
1869 oldctx
= MemoryContextSwitchTo(GetPerTupleMemoryContext(estate
));
1870 slot_store_data(remoteslot
, rel
, &oldtup
);
1871 MemoryContextSwitchTo(oldctx
);
1873 /* For a partitioned table, apply delete to correct partition. */
1874 if (rel
->localrel
->rd_rel
->relkind
== RELKIND_PARTITIONED_TABLE
)
1875 apply_handle_tuple_routing(edata
,
1876 remoteslot
, NULL
, CMD_DELETE
);
1878 apply_handle_delete_internal(edata
, edata
->targetRelInfo
,
1881 finish_edata(edata
);
1883 /* Reset relation for error callback */
1884 apply_error_callback_arg
.rel
= NULL
;
1886 logicalrep_rel_close(rel
, NoLock
);
1888 end_replication_step();
1892 * Workhorse for apply_handle_delete()
1893 * relinfo is for the relation we're actually deleting from
1894 * (could be a child partition of edata->targetRelInfo)
1897 apply_handle_delete_internal(ApplyExecutionData
*edata
,
1898 ResultRelInfo
*relinfo
,
1899 TupleTableSlot
*remoteslot
)
1901 EState
*estate
= edata
->estate
;
1902 Relation localrel
= relinfo
->ri_RelationDesc
;
1903 LogicalRepRelation
*remoterel
= &edata
->targetRel
->remoterel
;
1905 TupleTableSlot
*localslot
;
1908 EvalPlanQualInit(&epqstate
, estate
, NULL
, NIL
, -1);
1909 ExecOpenIndices(relinfo
, false);
1911 found
= FindReplTupleInLocalRel(estate
, localrel
, remoterel
,
1912 remoteslot
, &localslot
);
1914 /* If found delete it. */
1917 EvalPlanQualSetSlot(&epqstate
, localslot
);
1919 /* Do the actual delete. */
1920 ExecSimpleRelationDelete(relinfo
, estate
, &epqstate
, localslot
);
1925 * The tuple to be deleted could not be found. Do nothing except for
1926 * emitting a log message.
1928 * XXX should this be promoted to ereport(LOG) perhaps?
1931 "logical replication did not find row to be deleted "
1932 "in replication target relation \"%s\"",
1933 RelationGetRelationName(localrel
));
1937 ExecCloseIndices(relinfo
);
1938 EvalPlanQualEnd(&epqstate
);
1942 * Try to find a tuple received from the publication side (in 'remoteslot') in
1943 * the corresponding local relation using either replica identity index,
1944 * primary key or if needed, sequential scan.
1946 * Local tuple, if found, is returned in '*localslot'.
1949 FindReplTupleInLocalRel(EState
*estate
, Relation localrel
,
1950 LogicalRepRelation
*remoterel
,
1951 TupleTableSlot
*remoteslot
,
1952 TupleTableSlot
**localslot
)
1957 *localslot
= table_slot_create(localrel
, &estate
->es_tupleTable
);
1959 idxoid
= GetRelationIdentityOrPK(localrel
);
1960 Assert(OidIsValid(idxoid
) ||
1961 (remoterel
->replident
== REPLICA_IDENTITY_FULL
));
1963 if (OidIsValid(idxoid
))
1964 found
= RelationFindReplTupleByIndex(localrel
, idxoid
,
1966 remoteslot
, *localslot
);
1968 found
= RelationFindReplTupleSeq(localrel
, LockTupleExclusive
,
1969 remoteslot
, *localslot
);
1975 * This handles insert, update, delete on a partitioned table.
1978 apply_handle_tuple_routing(ApplyExecutionData
*edata
,
1979 TupleTableSlot
*remoteslot
,
1980 LogicalRepTupleData
*newtup
,
1983 EState
*estate
= edata
->estate
;
1984 LogicalRepRelMapEntry
*relmapentry
= edata
->targetRel
;
1985 ResultRelInfo
*relinfo
= edata
->targetRelInfo
;
1986 Relation parentrel
= relinfo
->ri_RelationDesc
;
1987 ModifyTableState
*mtstate
;
1988 PartitionTupleRouting
*proute
;
1989 ResultRelInfo
*partrelinfo
;
1991 TupleTableSlot
*remoteslot_part
;
1992 TupleConversionMap
*map
;
1993 MemoryContext oldctx
;
1995 /* ModifyTableState is needed for ExecFindPartition(). */
1996 edata
->mtstate
= mtstate
= makeNode(ModifyTableState
);
1997 mtstate
->ps
.plan
= NULL
;
1998 mtstate
->ps
.state
= estate
;
1999 mtstate
->operation
= operation
;
2000 mtstate
->resultRelInfo
= relinfo
;
2002 /* ... as is PartitionTupleRouting. */
2003 edata
->proute
= proute
= ExecSetupPartitionTupleRouting(estate
, parentrel
);
2006 * Find the partition to which the "search tuple" belongs.
2008 Assert(remoteslot
!= NULL
);
2009 oldctx
= MemoryContextSwitchTo(GetPerTupleMemoryContext(estate
));
2010 partrelinfo
= ExecFindPartition(mtstate
, relinfo
, proute
,
2011 remoteslot
, estate
);
2012 Assert(partrelinfo
!= NULL
);
2013 partrel
= partrelinfo
->ri_RelationDesc
;
2016 * To perform any of the operations below, the tuple must match the
2017 * partition's rowtype. Convert if needed or just copy, using a dedicated
2018 * slot to store the tuple in any case.
2020 remoteslot_part
= partrelinfo
->ri_PartitionTupleSlot
;
2021 if (remoteslot_part
== NULL
)
2022 remoteslot_part
= table_slot_create(partrel
, &estate
->es_tupleTable
);
2023 map
= partrelinfo
->ri_RootToPartitionMap
;
2025 remoteslot_part
= execute_attr_map_slot(map
->attrMap
, remoteslot
,
2029 remoteslot_part
= ExecCopySlot(remoteslot_part
, remoteslot
);
2030 slot_getallattrs(remoteslot_part
);
2032 MemoryContextSwitchTo(oldctx
);
2037 apply_handle_insert_internal(edata
, partrelinfo
,
2042 apply_handle_delete_internal(edata
, partrelinfo
,
2049 * For UPDATE, depending on whether or not the updated tuple
2050 * satisfies the partition's constraint, perform a simple UPDATE
2051 * of the partition or move the updated tuple into a different
2052 * suitable partition.
2055 AttrMap
*attrmap
= map
? map
->attrMap
: NULL
;
2056 LogicalRepRelMapEntry
*part_entry
;
2057 TupleTableSlot
*localslot
;
2058 ResultRelInfo
*partrelinfo_new
;
2061 part_entry
= logicalrep_partition_open(relmapentry
, partrel
,
2064 /* Get the matching local tuple from the partition. */
2065 found
= FindReplTupleInLocalRel(estate
, partrel
,
2066 &part_entry
->remoterel
,
2067 remoteslot_part
, &localslot
);
2071 * The tuple to be updated could not be found. Do nothing
2072 * except for emitting a log message.
2074 * XXX should this be promoted to ereport(LOG) perhaps?
2077 "logical replication did not find row to be updated "
2078 "in replication target relation's partition \"%s\"",
2079 RelationGetRelationName(partrel
));
2084 * Apply the update to the local tuple, putting the result in
2087 oldctx
= MemoryContextSwitchTo(GetPerTupleMemoryContext(estate
));
2088 slot_modify_data(remoteslot_part
, localslot
, part_entry
,
2090 MemoryContextSwitchTo(oldctx
);
2093 * Does the updated tuple still satisfy the current
2094 * partition's constraint?
2096 if (!partrel
->rd_rel
->relispartition
||
2097 ExecPartitionCheck(partrelinfo
, remoteslot_part
, estate
,
2101 * Yes, so simply UPDATE the partition. We don't call
2102 * apply_handle_update_internal() here, which would
2103 * normally do the following work, to avoid repeating some
2104 * work already done above to find the local tuple in the
2109 EvalPlanQualInit(&epqstate
, estate
, NULL
, NIL
, -1);
2110 ExecOpenIndices(partrelinfo
, false);
2112 EvalPlanQualSetSlot(&epqstate
, remoteslot_part
);
2113 ExecSimpleRelationUpdate(partrelinfo
, estate
, &epqstate
,
2114 localslot
, remoteslot_part
);
2115 ExecCloseIndices(partrelinfo
);
2116 EvalPlanQualEnd(&epqstate
);
2120 /* Move the tuple into the new partition. */
2123 * New partition will be found using tuple routing, which
2124 * can only occur via the parent table. We might need to
2125 * convert the tuple to the parent's rowtype. Note that
2126 * this is the tuple found in the partition, not the
2127 * original search tuple received by this function.
2131 TupleConversionMap
*PartitionToRootMap
=
2132 convert_tuples_by_name(RelationGetDescr(partrel
),
2133 RelationGetDescr(parentrel
));
2136 execute_attr_map_slot(PartitionToRootMap
->attrMap
,
2137 remoteslot_part
, remoteslot
);
2141 remoteslot
= ExecCopySlot(remoteslot
, remoteslot_part
);
2142 slot_getallattrs(remoteslot
);
2146 /* Find the new partition. */
2147 oldctx
= MemoryContextSwitchTo(GetPerTupleMemoryContext(estate
));
2148 partrelinfo_new
= ExecFindPartition(mtstate
, relinfo
,
2151 MemoryContextSwitchTo(oldctx
);
2152 Assert(partrelinfo_new
!= partrelinfo
);
2154 /* DELETE old tuple found in the old partition. */
2155 apply_handle_delete_internal(edata
, partrelinfo
,
2158 /* INSERT new tuple into the new partition. */
2161 * Convert the replacement tuple to match the destination
2162 * partition rowtype.
2164 oldctx
= MemoryContextSwitchTo(GetPerTupleMemoryContext(estate
));
2165 partrel
= partrelinfo_new
->ri_RelationDesc
;
2166 remoteslot_part
= partrelinfo_new
->ri_PartitionTupleSlot
;
2167 if (remoteslot_part
== NULL
)
2168 remoteslot_part
= table_slot_create(partrel
,
2169 &estate
->es_tupleTable
);
2170 map
= partrelinfo_new
->ri_RootToPartitionMap
;
2173 remoteslot_part
= execute_attr_map_slot(map
->attrMap
,
2179 remoteslot_part
= ExecCopySlot(remoteslot_part
,
2181 slot_getallattrs(remoteslot
);
2183 MemoryContextSwitchTo(oldctx
);
2184 apply_handle_insert_internal(edata
, partrelinfo_new
,
2191 elog(ERROR
, "unrecognized CmdType: %d", (int) operation
);
2197 * Handle TRUNCATE message.
2202 apply_handle_truncate(StringInfo s
)
2204 bool cascade
= false;
2205 bool restart_seqs
= false;
2206 List
*remote_relids
= NIL
;
2207 List
*remote_rels
= NIL
;
2209 List
*part_rels
= NIL
;
2211 List
*relids_logged
= NIL
;
2213 LOCKMODE lockmode
= AccessExclusiveLock
;
2215 if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE
, s
))
2218 begin_replication_step();
2220 remote_relids
= logicalrep_read_truncate(s
, &cascade
, &restart_seqs
);
2222 foreach(lc
, remote_relids
)
2224 LogicalRepRelId relid
= lfirst_oid(lc
);
2225 LogicalRepRelMapEntry
*rel
;
2227 rel
= logicalrep_rel_open(relid
, lockmode
);
2228 if (!should_apply_changes_for_rel(rel
))
2231 * The relation can't become interesting in the middle of the
2232 * transaction so it's safe to unlock it.
2234 logicalrep_rel_close(rel
, lockmode
);
2238 remote_rels
= lappend(remote_rels
, rel
);
2239 rels
= lappend(rels
, rel
->localrel
);
2240 relids
= lappend_oid(relids
, rel
->localreloid
);
2241 if (RelationIsLogicallyLogged(rel
->localrel
))
2242 relids_logged
= lappend_oid(relids_logged
, rel
->localreloid
);
2245 * Truncate partitions if we got a message to truncate a partitioned
2248 if (rel
->localrel
->rd_rel
->relkind
== RELKIND_PARTITIONED_TABLE
)
2251 List
*children
= find_all_inheritors(rel
->localreloid
,
2255 foreach(child
, children
)
2257 Oid childrelid
= lfirst_oid(child
);
2260 if (list_member_oid(relids
, childrelid
))
2263 /* find_all_inheritors already got lock */
2264 childrel
= table_open(childrelid
, NoLock
);
2267 * Ignore temp tables of other backends. See similar code in
2268 * ExecuteTruncate().
2270 if (RELATION_IS_OTHER_TEMP(childrel
))
2272 table_close(childrel
, lockmode
);
2276 rels
= lappend(rels
, childrel
);
2277 part_rels
= lappend(part_rels
, childrel
);
2278 relids
= lappend_oid(relids
, childrelid
);
2279 /* Log this relation only if needed for logical decoding */
2280 if (RelationIsLogicallyLogged(childrel
))
2281 relids_logged
= lappend_oid(relids_logged
, childrelid
);
2287 * Even if we used CASCADE on the upstream primary we explicitly default
2288 * to replaying changes without further cascading. This might be later
2289 * changeable with a user specified option.
2291 ExecuteTruncateGuts(rels
,
2296 foreach(lc
, remote_rels
)
2298 LogicalRepRelMapEntry
*rel
= lfirst(lc
);
2300 logicalrep_rel_close(rel
, NoLock
);
2302 foreach(lc
, part_rels
)
2304 Relation rel
= lfirst(lc
);
2306 table_close(rel
, NoLock
);
2309 end_replication_step();
2314 * Logical replication protocol message dispatcher.
2317 apply_dispatch(StringInfo s
)
2319 LogicalRepMsgType action
= pq_getmsgbyte(s
);
2320 LogicalRepMsgType saved_command
;
2323 * Set the current command being applied. Since this function can be
2324 * called recusively when applying spooled changes, save the current
2327 saved_command
= apply_error_callback_arg
.command
;
2328 apply_error_callback_arg
.command
= action
;
2332 case LOGICAL_REP_MSG_BEGIN
:
2333 apply_handle_begin(s
);
2336 case LOGICAL_REP_MSG_COMMIT
:
2337 apply_handle_commit(s
);
2340 case LOGICAL_REP_MSG_INSERT
:
2341 apply_handle_insert(s
);
2344 case LOGICAL_REP_MSG_UPDATE
:
2345 apply_handle_update(s
);
2348 case LOGICAL_REP_MSG_DELETE
:
2349 apply_handle_delete(s
);
2352 case LOGICAL_REP_MSG_TRUNCATE
:
2353 apply_handle_truncate(s
);
2356 case LOGICAL_REP_MSG_RELATION
:
2357 apply_handle_relation(s
);
2360 case LOGICAL_REP_MSG_TYPE
:
2361 apply_handle_type(s
);
2364 case LOGICAL_REP_MSG_ORIGIN
:
2365 apply_handle_origin(s
);
2368 case LOGICAL_REP_MSG_MESSAGE
:
2371 * Logical replication does not use generic logical messages yet.
2372 * Although, it could be used by other applications that use this
2377 case LOGICAL_REP_MSG_STREAM_START
:
2378 apply_handle_stream_start(s
);
2381 case LOGICAL_REP_MSG_STREAM_STOP
:
2382 apply_handle_stream_stop(s
);
2385 case LOGICAL_REP_MSG_STREAM_ABORT
:
2386 apply_handle_stream_abort(s
);
2389 case LOGICAL_REP_MSG_STREAM_COMMIT
:
2390 apply_handle_stream_commit(s
);
2393 case LOGICAL_REP_MSG_BEGIN_PREPARE
:
2394 apply_handle_begin_prepare(s
);
2397 case LOGICAL_REP_MSG_PREPARE
:
2398 apply_handle_prepare(s
);
2401 case LOGICAL_REP_MSG_COMMIT_PREPARED
:
2402 apply_handle_commit_prepared(s
);
2405 case LOGICAL_REP_MSG_ROLLBACK_PREPARED
:
2406 apply_handle_rollback_prepared(s
);
2409 case LOGICAL_REP_MSG_STREAM_PREPARE
:
2410 apply_handle_stream_prepare(s
);
2415 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
2416 errmsg("invalid logical replication message type \"%c\"", action
)));
2419 /* Reset the current command */
2420 apply_error_callback_arg
.command
= saved_command
;
2424 * Figure out which write/flush positions to report to the walsender process.
2426 * We can't simply report back the last LSN the walsender sent us because the
2427 * local transaction might not yet be flushed to disk locally. Instead we
2428 * build a list that associates local with remote LSNs for every commit. When
2429 * reporting back the flush position to the sender we iterate that list and
2430 * check which entries on it are already locally flushed. Those we can report
2431 * as having been flushed.
2433 * The have_pending_txes is true if there are outstanding transactions that
2434 * need to be flushed.
2437 get_flush_position(XLogRecPtr
*write
, XLogRecPtr
*flush
,
2438 bool *have_pending_txes
)
2440 dlist_mutable_iter iter
;
2441 XLogRecPtr local_flush
= GetFlushRecPtr(NULL
);
2443 *write
= InvalidXLogRecPtr
;
2444 *flush
= InvalidXLogRecPtr
;
2446 dlist_foreach_modify(iter
, &lsn_mapping
)
2448 FlushPosition
*pos
=
2449 dlist_container(FlushPosition
, node
, iter
.cur
);
2451 *write
= pos
->remote_end
;
2453 if (pos
->local_end
<= local_flush
)
2455 *flush
= pos
->remote_end
;
2456 dlist_delete(iter
.cur
);
2462 * Don't want to uselessly iterate over the rest of the list which
2463 * could potentially be long. Instead get the last element and
2464 * grab the write position from there.
2466 pos
= dlist_tail_element(FlushPosition
, node
,
2468 *write
= pos
->remote_end
;
2469 *have_pending_txes
= true;
2474 *have_pending_txes
= !dlist_is_empty(&lsn_mapping
);
2478 * Store current remote/local lsn pair in the tracking list.
2481 store_flush_position(XLogRecPtr remote_lsn
)
2483 FlushPosition
*flushpos
;
2485 /* Need to do this in permanent context */
2486 MemoryContextSwitchTo(ApplyContext
);
2488 /* Track commit lsn */
2489 flushpos
= (FlushPosition
*) palloc(sizeof(FlushPosition
));
2490 flushpos
->local_end
= XactLastCommitEnd
;
2491 flushpos
->remote_end
= remote_lsn
;
2493 dlist_push_tail(&lsn_mapping
, &flushpos
->node
);
2494 MemoryContextSwitchTo(ApplyMessageContext
);
2498 /* Update statistics of the worker. */
2500 UpdateWorkerStats(XLogRecPtr last_lsn
, TimestampTz send_time
, bool reply
)
2502 MyLogicalRepWorker
->last_lsn
= last_lsn
;
2503 MyLogicalRepWorker
->last_send_time
= send_time
;
2504 MyLogicalRepWorker
->last_recv_time
= GetCurrentTimestamp();
2507 MyLogicalRepWorker
->reply_lsn
= last_lsn
;
2508 MyLogicalRepWorker
->reply_time
= send_time
;
2516 LogicalRepApplyLoop(XLogRecPtr last_received
)
2518 TimestampTz last_recv_timestamp
= GetCurrentTimestamp();
2519 bool ping_sent
= false;
2521 ErrorContextCallback errcallback
;
2524 * Init the ApplyMessageContext which we clean up after each replication
2527 ApplyMessageContext
= AllocSetContextCreate(ApplyContext
,
2528 "ApplyMessageContext",
2529 ALLOCSET_DEFAULT_SIZES
);
2532 * This memory context is used for per-stream data when the streaming mode
2533 * is enabled. This context is reset on each stream stop.
2535 LogicalStreamingContext
= AllocSetContextCreate(ApplyContext
,
2536 "LogicalStreamingContext",
2537 ALLOCSET_DEFAULT_SIZES
);
2539 /* mark as idle, before starting to loop */
2540 pgstat_report_activity(STATE_IDLE
, NULL
);
2543 * Push apply error context callback. Fields will be filled during
2544 * applying a change.
2546 errcallback
.callback
= apply_error_callback
;
2547 errcallback
.previous
= error_context_stack
;
2548 error_context_stack
= &errcallback
;
2550 /* This outer loop iterates once per wait. */
2553 pgsocket fd
= PGINVALID_SOCKET
;
2557 bool endofstream
= false;
2560 CHECK_FOR_INTERRUPTS();
2562 MemoryContextSwitchTo(ApplyMessageContext
);
2564 len
= walrcv_receive(LogRepWorkerWalRcvConn
, &buf
, &fd
);
2568 /* Loop to process all available data (without blocking). */
2571 CHECK_FOR_INTERRUPTS();
2580 (errmsg("data stream from publisher has ended")));
2589 /* Reset timeout. */
2590 last_recv_timestamp
= GetCurrentTimestamp();
2593 /* Ensure we are reading the data into our memory context. */
2594 MemoryContextSwitchTo(ApplyMessageContext
);
2601 c
= pq_getmsgbyte(&s
);
2605 XLogRecPtr start_lsn
;
2607 TimestampTz send_time
;
2609 start_lsn
= pq_getmsgint64(&s
);
2610 end_lsn
= pq_getmsgint64(&s
);
2611 send_time
= pq_getmsgint64(&s
);
2613 if (last_received
< start_lsn
)
2614 last_received
= start_lsn
;
2616 if (last_received
< end_lsn
)
2617 last_received
= end_lsn
;
2619 UpdateWorkerStats(last_received
, send_time
, false);
2626 TimestampTz timestamp
;
2627 bool reply_requested
;
2629 end_lsn
= pq_getmsgint64(&s
);
2630 timestamp
= pq_getmsgint64(&s
);
2631 reply_requested
= pq_getmsgbyte(&s
);
2633 if (last_received
< end_lsn
)
2634 last_received
= end_lsn
;
2636 send_feedback(last_received
, reply_requested
, false);
2637 UpdateWorkerStats(last_received
, timestamp
, true);
2639 /* other message types are purposefully ignored */
2641 MemoryContextReset(ApplyMessageContext
);
2644 len
= walrcv_receive(LogRepWorkerWalRcvConn
, &buf
, &fd
);
2648 /* confirm all writes so far */
2649 send_feedback(last_received
, false, false);
2651 if (!in_remote_transaction
&& !in_streamed_transaction
)
2654 * If we didn't get any transactions for a while there might be
2655 * unconsumed invalidation messages in the queue, consume them
2658 AcceptInvalidationMessages();
2659 maybe_reread_subscription();
2661 /* Process any table synchronization changes. */
2662 process_syncing_tables(last_received
);
2665 /* Cleanup the memory. */
2666 MemoryContextResetAndDeleteChildren(ApplyMessageContext
);
2667 MemoryContextSwitchTo(TopMemoryContext
);
2669 /* Check if we need to exit the streaming loop. */
2674 * Wait for more data or latch. If we have unflushed transactions,
2675 * wake up after WalWriterDelay to see if they've been flushed yet (in
2676 * which case we should send a feedback message). Otherwise, there's
2677 * no particular urgency about waking up unless we get data or a
2680 if (!dlist_is_empty(&lsn_mapping
))
2681 wait_time
= WalWriterDelay
;
2683 wait_time
= NAPTIME_PER_CYCLE
;
2685 rc
= WaitLatchOrSocket(MyLatch
,
2686 WL_SOCKET_READABLE
| WL_LATCH_SET
|
2687 WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
,
2689 WAIT_EVENT_LOGICAL_APPLY_MAIN
);
2691 if (rc
& WL_LATCH_SET
)
2693 ResetLatch(MyLatch
);
2694 CHECK_FOR_INTERRUPTS();
2697 if (ConfigReloadPending
)
2699 ConfigReloadPending
= false;
2700 ProcessConfigFile(PGC_SIGHUP
);
2703 if (rc
& WL_TIMEOUT
)
2706 * We didn't receive anything new. If we haven't heard anything
2707 * from the server for more than wal_receiver_timeout / 2, ping
2708 * the server. Also, if it's been longer than
2709 * wal_receiver_status_interval since the last update we sent,
2710 * send a status update to the primary anyway, to report any
2711 * progress in applying WAL.
2713 bool requestReply
= false;
2716 * Check if time since last receive from primary has reached the
2719 if (wal_receiver_timeout
> 0)
2721 TimestampTz now
= GetCurrentTimestamp();
2722 TimestampTz timeout
;
2725 TimestampTzPlusMilliseconds(last_recv_timestamp
,
2726 wal_receiver_timeout
);
2730 (errcode(ERRCODE_CONNECTION_FAILURE
),
2731 errmsg("terminating logical replication worker due to timeout")));
2733 /* Check to see if it's time for a ping. */
2736 timeout
= TimestampTzPlusMilliseconds(last_recv_timestamp
,
2737 (wal_receiver_timeout
/ 2));
2740 requestReply
= true;
2746 send_feedback(last_received
, requestReply
, requestReply
);
2750 /* Pop the error context stack */
2751 error_context_stack
= errcallback
.previous
;
2754 walrcv_endstreaming(LogRepWorkerWalRcvConn
, &tli
);
2758 * Send a Standby Status Update message to server.
2760 * 'recvpos' is the latest LSN we've received data to, force is set if we need
2761 * to send a response to avoid timeouts.
2764 send_feedback(XLogRecPtr recvpos
, bool force
, bool requestReply
)
2766 static StringInfo reply_message
= NULL
;
2767 static TimestampTz send_time
= 0;
2769 static XLogRecPtr last_recvpos
= InvalidXLogRecPtr
;
2770 static XLogRecPtr last_writepos
= InvalidXLogRecPtr
;
2771 static XLogRecPtr last_flushpos
= InvalidXLogRecPtr
;
2773 XLogRecPtr writepos
;
2774 XLogRecPtr flushpos
;
2776 bool have_pending_txes
;
2779 * If the user doesn't want status to be reported to the publisher, be
2780 * sure to exit before doing anything at all.
2782 if (!force
&& wal_receiver_status_interval
<= 0)
2785 /* It's legal to not pass a recvpos */
2786 if (recvpos
< last_recvpos
)
2787 recvpos
= last_recvpos
;
2789 get_flush_position(&writepos
, &flushpos
, &have_pending_txes
);
2792 * No outstanding transactions to flush, we can report the latest received
2793 * position. This is important for synchronous replication.
2795 if (!have_pending_txes
)
2796 flushpos
= writepos
= recvpos
;
2798 if (writepos
< last_writepos
)
2799 writepos
= last_writepos
;
2801 if (flushpos
< last_flushpos
)
2802 flushpos
= last_flushpos
;
2804 now
= GetCurrentTimestamp();
2806 /* if we've already reported everything we're good */
2808 writepos
== last_writepos
&&
2809 flushpos
== last_flushpos
&&
2810 !TimestampDifferenceExceeds(send_time
, now
,
2811 wal_receiver_status_interval
* 1000))
2817 MemoryContext oldctx
= MemoryContextSwitchTo(ApplyContext
);
2819 reply_message
= makeStringInfo();
2820 MemoryContextSwitchTo(oldctx
);
2823 resetStringInfo(reply_message
);
2825 pq_sendbyte(reply_message
, 'r');
2826 pq_sendint64(reply_message
, recvpos
); /* write */
2827 pq_sendint64(reply_message
, flushpos
); /* flush */
2828 pq_sendint64(reply_message
, writepos
); /* apply */
2829 pq_sendint64(reply_message
, now
); /* sendTime */
2830 pq_sendbyte(reply_message
, requestReply
); /* replyRequested */
2832 elog(DEBUG2
, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2834 LSN_FORMAT_ARGS(recvpos
),
2835 LSN_FORMAT_ARGS(writepos
),
2836 LSN_FORMAT_ARGS(flushpos
));
2838 walrcv_send(LogRepWorkerWalRcvConn
,
2839 reply_message
->data
, reply_message
->len
);
2841 if (recvpos
> last_recvpos
)
2842 last_recvpos
= recvpos
;
2843 if (writepos
> last_writepos
)
2844 last_writepos
= writepos
;
2845 if (flushpos
> last_flushpos
)
2846 last_flushpos
= flushpos
;
2850 * Reread subscription info if needed. Most changes will be exit.
2853 maybe_reread_subscription(void)
2855 MemoryContext oldctx
;
2856 Subscription
*newsub
;
2857 bool started_tx
= false;
2859 /* When cache state is valid there is nothing to do here. */
2860 if (MySubscriptionValid
)
2863 /* This function might be called inside or outside of transaction. */
2864 if (!IsTransactionState())
2866 StartTransactionCommand();
2870 /* Ensure allocations in permanent context. */
2871 oldctx
= MemoryContextSwitchTo(ApplyContext
);
2873 newsub
= GetSubscription(MyLogicalRepWorker
->subid
, true);
2876 * Exit if the subscription was removed. This normally should not happen
2877 * as the worker gets killed during DROP SUBSCRIPTION.
2882 (errmsg("logical replication apply worker for subscription \"%s\" will "
2883 "stop because the subscription was removed",
2884 MySubscription
->name
)));
2890 * Exit if the subscription was disabled. This normally should not happen
2891 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2893 if (!newsub
->enabled
)
2896 (errmsg("logical replication apply worker for subscription \"%s\" will "
2897 "stop because the subscription was disabled",
2898 MySubscription
->name
)));
2903 /* !slotname should never happen when enabled is true. */
2904 Assert(newsub
->slotname
);
2906 /* two-phase should not be altered */
2907 Assert(newsub
->twophasestate
== MySubscription
->twophasestate
);
2910 * Exit if any parameter that affects the remote connection was changed.
2911 * The launcher will start a new worker.
2913 if (strcmp(newsub
->conninfo
, MySubscription
->conninfo
) != 0 ||
2914 strcmp(newsub
->name
, MySubscription
->name
) != 0 ||
2915 strcmp(newsub
->slotname
, MySubscription
->slotname
) != 0 ||
2916 newsub
->binary
!= MySubscription
->binary
||
2917 newsub
->stream
!= MySubscription
->stream
||
2918 !equal(newsub
->publications
, MySubscription
->publications
))
2921 (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2922 MySubscription
->name
)));
2927 /* Check for other changes that should never happen too. */
2928 if (newsub
->dbid
!= MySubscription
->dbid
)
2930 elog(ERROR
, "subscription %u changed unexpectedly",
2931 MyLogicalRepWorker
->subid
);
2934 /* Clean old subscription info and switch to new one. */
2935 FreeSubscription(MySubscription
);
2936 MySubscription
= newsub
;
2938 MemoryContextSwitchTo(oldctx
);
2940 /* Change synchronous commit according to the user's wishes */
2941 SetConfigOption("synchronous_commit", MySubscription
->synccommit
,
2942 PGC_BACKEND
, PGC_S_OVERRIDE
);
2945 CommitTransactionCommand();
2947 MySubscriptionValid
= true;
2951 * Callback from subscription syscache invalidation.
2954 subscription_change_cb(Datum arg
, int cacheid
, uint32 hashvalue
)
2956 MySubscriptionValid
= false;
2960 * subxact_info_write
2961 * Store information about subxacts for a toplevel transaction.
2963 * For each subxact we store offset of it's first change in the main file.
2964 * The file is always over-written as a whole.
2966 * XXX We should only store subxacts that were not aborted yet.
2969 subxact_info_write(Oid subid
, TransactionId xid
)
2971 char path
[MAXPGPATH
];
2975 Assert(TransactionIdIsValid(xid
));
2977 /* construct the subxact filename */
2978 subxact_filename(path
, subid
, xid
);
2980 /* Delete the subxacts file, if exists. */
2981 if (subxact_data
.nsubxacts
== 0)
2983 cleanup_subxact_info();
2984 BufFileDeleteFileSet(MyLogicalRepWorker
->stream_fileset
, path
, true);
2990 * Create the subxact file if it not already created, otherwise open the
2993 fd
= BufFileOpenFileSet(MyLogicalRepWorker
->stream_fileset
, path
, O_RDWR
,
2996 fd
= BufFileCreateFileSet(MyLogicalRepWorker
->stream_fileset
, path
);
2998 len
= sizeof(SubXactInfo
) * subxact_data
.nsubxacts
;
3000 /* Write the subxact count and subxact info */
3001 BufFileWrite(fd
, &subxact_data
.nsubxacts
, sizeof(subxact_data
.nsubxacts
));
3002 BufFileWrite(fd
, subxact_data
.subxacts
, len
);
3006 /* free the memory allocated for subxact info */
3007 cleanup_subxact_info();
3012 * Restore information about subxacts of a streamed transaction.
3014 * Read information about subxacts into the structure subxact_data that can be
3018 subxact_info_read(Oid subid
, TransactionId xid
)
3020 char path
[MAXPGPATH
];
3023 MemoryContext oldctx
;
3025 Assert(!subxact_data
.subxacts
);
3026 Assert(subxact_data
.nsubxacts
== 0);
3027 Assert(subxact_data
.nsubxacts_max
== 0);
3030 * If the subxact file doesn't exist that means we don't have any subxact
3033 subxact_filename(path
, subid
, xid
);
3034 fd
= BufFileOpenFileSet(MyLogicalRepWorker
->stream_fileset
, path
, O_RDONLY
,
3039 /* read number of subxact items */
3040 if (BufFileRead(fd
, &subxact_data
.nsubxacts
,
3041 sizeof(subxact_data
.nsubxacts
)) !=
3042 sizeof(subxact_data
.nsubxacts
))
3044 (errcode_for_file_access(),
3045 errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3048 len
= sizeof(SubXactInfo
) * subxact_data
.nsubxacts
;
3050 /* we keep the maximum as a power of 2 */
3051 subxact_data
.nsubxacts_max
= 1 << my_log2(subxact_data
.nsubxacts
);
3054 * Allocate subxact information in the logical streaming context. We need
3055 * this information during the complete stream so that we can add the sub
3056 * transaction info to this. On stream stop we will flush this information
3057 * to the subxact file and reset the logical streaming context.
3059 oldctx
= MemoryContextSwitchTo(LogicalStreamingContext
);
3060 subxact_data
.subxacts
= palloc(subxact_data
.nsubxacts_max
*
3061 sizeof(SubXactInfo
));
3062 MemoryContextSwitchTo(oldctx
);
3064 if ((len
> 0) && ((BufFileRead(fd
, subxact_data
.subxacts
, len
)) != len
))
3066 (errcode_for_file_access(),
3067 errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3075 * Add information about a subxact (offset in the main file).
3078 subxact_info_add(TransactionId xid
)
3080 SubXactInfo
*subxacts
= subxact_data
.subxacts
;
3083 /* We must have a valid top level stream xid and a stream fd. */
3084 Assert(TransactionIdIsValid(stream_xid
));
3085 Assert(stream_fd
!= NULL
);
3088 * If the XID matches the toplevel transaction, we don't want to add it.
3090 if (stream_xid
== xid
)
3094 * In most cases we're checking the same subxact as we've already seen in
3095 * the last call, so make sure to ignore it (this change comes later).
3097 if (subxact_data
.subxact_last
== xid
)
3100 /* OK, remember we're processing this XID. */
3101 subxact_data
.subxact_last
= xid
;
3104 * Check if the transaction is already present in the array of subxact. We
3105 * intentionally scan the array from the tail, because we're likely adding
3106 * a change for the most recent subtransactions.
3108 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
3109 * would allow us to use binary search here.
3111 for (i
= subxact_data
.nsubxacts
; i
> 0; i
--)
3113 /* found, so we're done */
3114 if (subxacts
[i
- 1].xid
== xid
)
3118 /* This is a new subxact, so we need to add it to the array. */
3119 if (subxact_data
.nsubxacts
== 0)
3121 MemoryContext oldctx
;
3123 subxact_data
.nsubxacts_max
= 128;
3126 * Allocate this memory for subxacts in per-stream context, see
3127 * subxact_info_read.
3129 oldctx
= MemoryContextSwitchTo(LogicalStreamingContext
);
3130 subxacts
= palloc(subxact_data
.nsubxacts_max
* sizeof(SubXactInfo
));
3131 MemoryContextSwitchTo(oldctx
);
3133 else if (subxact_data
.nsubxacts
== subxact_data
.nsubxacts_max
)
3135 subxact_data
.nsubxacts_max
*= 2;
3136 subxacts
= repalloc(subxacts
,
3137 subxact_data
.nsubxacts_max
* sizeof(SubXactInfo
));
3140 subxacts
[subxact_data
.nsubxacts
].xid
= xid
;
3143 * Get the current offset of the stream file and store it as offset of
3146 BufFileTell(stream_fd
,
3147 &subxacts
[subxact_data
.nsubxacts
].fileno
,
3148 &subxacts
[subxact_data
.nsubxacts
].offset
);
3150 subxact_data
.nsubxacts
++;
3151 subxact_data
.subxacts
= subxacts
;
3154 /* format filename for file containing the info about subxacts */
3156 subxact_filename(char *path
, Oid subid
, TransactionId xid
)
3158 snprintf(path
, MAXPGPATH
, "%u-%u.subxacts", subid
, xid
);
3161 /* format filename for file containing serialized changes */
3163 changes_filename(char *path
, Oid subid
, TransactionId xid
)
3165 snprintf(path
, MAXPGPATH
, "%u-%u.changes", subid
, xid
);
3169 * stream_cleanup_files
3170 * Cleanup files for a subscription / toplevel transaction.
3172 * Remove files with serialized changes and subxact info for a particular
3173 * toplevel transaction. Each subscription has a separate set of files
3174 * for any toplevel transaction.
3177 stream_cleanup_files(Oid subid
, TransactionId xid
)
3179 char path
[MAXPGPATH
];
3181 /* Delete the changes file. */
3182 changes_filename(path
, subid
, xid
);
3183 BufFileDeleteFileSet(MyLogicalRepWorker
->stream_fileset
, path
, false);
3185 /* Delete the subxact file, if it exists. */
3186 subxact_filename(path
, subid
, xid
);
3187 BufFileDeleteFileSet(MyLogicalRepWorker
->stream_fileset
, path
, true);
3192 * Open a file that we'll use to serialize changes for a toplevel
3195 * Open a file for streamed changes from a toplevel transaction identified
3196 * by stream_xid (global variable). If it's the first chunk of streamed
3197 * changes for this transaction, create the buffile, otherwise open the
3198 * previously created file.
3200 * This can only be called at the beginning of a "streaming" block, i.e.
3201 * between stream_start/stream_stop messages from the upstream.
3204 stream_open_file(Oid subid
, TransactionId xid
, bool first_segment
)
3206 char path
[MAXPGPATH
];
3207 MemoryContext oldcxt
;
3209 Assert(in_streamed_transaction
);
3210 Assert(OidIsValid(subid
));
3211 Assert(TransactionIdIsValid(xid
));
3212 Assert(stream_fd
== NULL
);
3215 changes_filename(path
, subid
, xid
);
3216 elog(DEBUG1
, "opening file \"%s\" for streamed changes", path
);
3219 * Create/open the buffiles under the logical streaming context so that we
3220 * have those files until stream stop.
3222 oldcxt
= MemoryContextSwitchTo(LogicalStreamingContext
);
3225 * If this is the first streamed segment, create the changes file.
3226 * Otherwise, just open the file for writing, in append mode.
3229 stream_fd
= BufFileCreateFileSet(MyLogicalRepWorker
->stream_fileset
,
3234 * Open the file and seek to the end of the file because we always
3235 * append the changes file.
3237 stream_fd
= BufFileOpenFileSet(MyLogicalRepWorker
->stream_fileset
,
3238 path
, O_RDWR
, false);
3239 BufFileSeek(stream_fd
, 0, 0, SEEK_END
);
3242 MemoryContextSwitchTo(oldcxt
);
3247 * Close the currently open file with streamed changes.
3249 * This can only be called at the end of a streaming block, i.e. at stream_stop
3250 * message from the upstream.
3253 stream_close_file(void)
3255 Assert(in_streamed_transaction
);
3256 Assert(TransactionIdIsValid(stream_xid
));
3257 Assert(stream_fd
!= NULL
);
3259 BufFileClose(stream_fd
);
3261 stream_xid
= InvalidTransactionId
;
3266 * stream_write_change
3267 * Serialize a change to a file for the current toplevel transaction.
3269 * The change is serialized in a simple format, with length (not including
3270 * the length), action code (identifying the message type) and message
3271 * contents (without the subxact TransactionId value).
3274 stream_write_change(char action
, StringInfo s
)
3278 Assert(in_streamed_transaction
);
3279 Assert(TransactionIdIsValid(stream_xid
));
3280 Assert(stream_fd
!= NULL
);
3282 /* total on-disk size, including the action type character */
3283 len
= (s
->len
- s
->cursor
) + sizeof(char);
3285 /* first write the size */
3286 BufFileWrite(stream_fd
, &len
, sizeof(len
));
3288 /* then the action */
3289 BufFileWrite(stream_fd
, &action
, sizeof(action
));
3291 /* and finally the remaining part of the buffer (after the XID) */
3292 len
= (s
->len
- s
->cursor
);
3294 BufFileWrite(stream_fd
, &s
->data
[s
->cursor
], len
);
3298 * Cleanup the memory for subxacts and reset the related variables.
3301 cleanup_subxact_info()
3303 if (subxact_data
.subxacts
)
3304 pfree(subxact_data
.subxacts
);
3306 subxact_data
.subxacts
= NULL
;
3307 subxact_data
.subxact_last
= InvalidTransactionId
;
3308 subxact_data
.nsubxacts
= 0;
3309 subxact_data
.nsubxacts_max
= 0;
3313 * Form the prepared transaction GID for two_phase transactions.
3315 * Return the GID in the supplied buffer.
3318 TwoPhaseTransactionGid(Oid subid
, TransactionId xid
, char *gid
, int szgid
)
3320 Assert(subid
!= InvalidRepOriginId
);
3322 if (!TransactionIdIsValid(xid
))
3324 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
3325 errmsg_internal("invalid two-phase transaction ID")));
3327 snprintf(gid
, szgid
, "pg_gid_%u_%u", subid
, xid
);
3330 /* Logical Replication Apply worker entry point */
3332 ApplyWorkerMain(Datum main_arg
)
3334 int worker_slot
= DatumGetInt32(main_arg
);
3335 MemoryContext cctx
= CurrentMemoryContext
;
3336 MemoryContext oldctx
;
3337 char originname
[NAMEDATALEN
];
3338 XLogRecPtr origin_startpos
;
3340 WalRcvStreamOptions options
;
3343 /* Attach to slot */
3344 logicalrep_worker_attach(worker_slot
);
3346 /* Setup signal handling */
3347 pqsignal(SIGHUP
, SignalHandlerForConfigReload
);
3348 pqsignal(SIGTERM
, die
);
3349 BackgroundWorkerUnblockSignals();
3352 * We don't currently need any ResourceOwner in a walreceiver process, but
3353 * if we did, we could call CreateAuxProcessResourceOwner here.
3356 /* Initialise stats to a sanish value */
3357 MyLogicalRepWorker
->last_send_time
= MyLogicalRepWorker
->last_recv_time
=
3358 MyLogicalRepWorker
->reply_time
= GetCurrentTimestamp();
3360 /* Load the libpq-specific functions */
3361 load_file("libpqwalreceiver", false);
3363 /* Run as replica session replication role. */
3364 SetConfigOption("session_replication_role", "replica",
3365 PGC_SUSET
, PGC_S_OVERRIDE
);
3367 /* Connect to our database. */
3368 BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker
->dbid
,
3369 MyLogicalRepWorker
->userid
,
3373 * Set always-secure search path, so malicious users can't redirect user
3374 * code (e.g. pg_index.indexprs).
3376 SetConfigOption("search_path", "", PGC_SUSET
, PGC_S_OVERRIDE
);
3378 /* Load the subscription into persistent memory context. */
3379 ApplyContext
= AllocSetContextCreate(TopMemoryContext
,
3381 ALLOCSET_DEFAULT_SIZES
);
3382 StartTransactionCommand();
3383 oldctx
= MemoryContextSwitchTo(ApplyContext
);
3385 MySubscription
= GetSubscription(MyLogicalRepWorker
->subid
, true);
3386 if (!MySubscription
)
3389 (errmsg("logical replication apply worker for subscription %u will not "
3390 "start because the subscription was removed during startup",
3391 MyLogicalRepWorker
->subid
)));
3395 MySubscriptionValid
= true;
3396 MemoryContextSwitchTo(oldctx
);
3398 if (!MySubscription
->enabled
)
3401 (errmsg("logical replication apply worker for subscription \"%s\" will not "
3402 "start because the subscription was disabled during startup",
3403 MySubscription
->name
)));
3408 /* Setup synchronous commit according to the user's wishes */
3409 SetConfigOption("synchronous_commit", MySubscription
->synccommit
,
3410 PGC_BACKEND
, PGC_S_OVERRIDE
);
3412 /* Keep us informed about subscription changes. */
3413 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID
,
3414 subscription_change_cb
,
3417 if (am_tablesync_worker())
3419 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3420 MySubscription
->name
, get_rel_name(MyLogicalRepWorker
->relid
))));
3423 (errmsg("logical replication apply worker for subscription \"%s\" has started",
3424 MySubscription
->name
)));
3426 CommitTransactionCommand();
3428 /* Connect to the origin and start the replication. */
3429 elog(DEBUG1
, "connecting to publisher using connection string \"%s\"",
3430 MySubscription
->conninfo
);
3432 if (am_tablesync_worker())
3438 /* This is table synchronization worker, call initial sync. */
3439 syncslotname
= LogicalRepSyncTableStart(&origin_startpos
);
3443 MemoryContext ecxt
= MemoryContextSwitchTo(cctx
);
3444 ErrorData
*errdata
= CopyErrorData();
3447 * Report the table sync error. There is no corresponding message
3448 * type for table synchronization.
3450 pgstat_report_subworker_error(MyLogicalRepWorker
->subid
,
3451 MyLogicalRepWorker
->relid
,
3452 MyLogicalRepWorker
->relid
,
3453 0, /* message type */
3454 InvalidTransactionId
,
3456 MemoryContextSwitchTo(ecxt
);
3461 /* allocate slot name in long-lived context */
3462 myslotname
= MemoryContextStrdup(ApplyContext
, syncslotname
);
3464 pfree(syncslotname
);
3468 /* This is main apply worker */
3469 RepOriginId originid
;
3470 TimeLineID startpointTLI
;
3473 myslotname
= MySubscription
->slotname
;
3476 * This shouldn't happen if the subscription is enabled, but guard
3477 * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3478 * crash if slot is NULL.)
3482 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
3483 errmsg("subscription has no replication slot set")));
3485 /* Setup replication origin tracking. */
3486 StartTransactionCommand();
3487 snprintf(originname
, sizeof(originname
), "pg_%u", MySubscription
->oid
);
3488 originid
= replorigin_by_name(originname
, true);
3489 if (!OidIsValid(originid
))
3490 originid
= replorigin_create(originname
);
3491 replorigin_session_setup(originid
);
3492 replorigin_session_origin
= originid
;
3493 origin_startpos
= replorigin_session_get_progress(false);
3494 CommitTransactionCommand();
3496 LogRepWorkerWalRcvConn
= walrcv_connect(MySubscription
->conninfo
, true,
3497 MySubscription
->name
, &err
);
3498 if (LogRepWorkerWalRcvConn
== NULL
)
3500 (errcode(ERRCODE_CONNECTION_FAILURE
),
3501 errmsg("could not connect to the publisher: %s", err
)));
3504 * We don't really use the output identify_system for anything but it
3505 * does some initializations on the upstream so let's still call it.
3507 (void) walrcv_identify_system(LogRepWorkerWalRcvConn
, &startpointTLI
);
3511 * Setup callback for syscache so that we know when something changes in
3512 * the subscription relation state.
3514 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP
,
3515 invalidate_syncing_table_states
,
3518 /* Build logical replication streaming options. */
3519 options
.logical
= true;
3520 options
.startpoint
= origin_startpos
;
3521 options
.slotname
= myslotname
;
3523 server_version
= walrcv_server_version(LogRepWorkerWalRcvConn
);
3524 options
.proto
.logical
.proto_version
=
3525 server_version
>= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
:
3526 server_version
>= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM
:
3527 LOGICALREP_PROTO_VERSION_NUM
;
3529 options
.proto
.logical
.publication_names
= MySubscription
->publications
;
3530 options
.proto
.logical
.binary
= MySubscription
->binary
;
3531 options
.proto
.logical
.streaming
= MySubscription
->stream
;
3532 options
.proto
.logical
.twophase
= false;
3534 if (!am_tablesync_worker())
3537 * Even when the two_phase mode is requested by the user, it remains
3538 * as the tri-state PENDING until all tablesyncs have reached READY
3539 * state. Only then, can it become ENABLED.
3541 * Note: If the subscription has no tables then leave the state as
3542 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
3545 if (MySubscription
->twophasestate
== LOGICALREP_TWOPHASE_STATE_PENDING
&&
3546 AllTablesyncsReady())
3548 /* Start streaming with two_phase enabled */
3549 options
.proto
.logical
.twophase
= true;
3550 walrcv_startstreaming(LogRepWorkerWalRcvConn
, &options
);
3552 StartTransactionCommand();
3553 UpdateTwoPhaseState(MySubscription
->oid
, LOGICALREP_TWOPHASE_STATE_ENABLED
);
3554 MySubscription
->twophasestate
= LOGICALREP_TWOPHASE_STATE_ENABLED
;
3555 CommitTransactionCommand();
3559 walrcv_startstreaming(LogRepWorkerWalRcvConn
, &options
);
3563 (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
3564 MySubscription
->name
,
3565 MySubscription
->twophasestate
== LOGICALREP_TWOPHASE_STATE_DISABLED
? "DISABLED" :
3566 MySubscription
->twophasestate
== LOGICALREP_TWOPHASE_STATE_PENDING
? "PENDING" :
3567 MySubscription
->twophasestate
== LOGICALREP_TWOPHASE_STATE_ENABLED
? "ENABLED" :
3572 /* Start normal logical streaming replication. */
3573 walrcv_startstreaming(LogRepWorkerWalRcvConn
, &options
);
3576 /* Run the main loop. */
3579 LogicalRepApplyLoop(origin_startpos
);
3583 /* report the apply error */
3584 if (apply_error_callback_arg
.command
!= 0)
3586 MemoryContext ecxt
= MemoryContextSwitchTo(cctx
);
3587 ErrorData
*errdata
= CopyErrorData();
3589 pgstat_report_subworker_error(MyLogicalRepWorker
->subid
,
3590 MyLogicalRepWorker
->relid
,
3591 apply_error_callback_arg
.rel
!= NULL
3592 ? apply_error_callback_arg
.rel
->localreloid
3594 apply_error_callback_arg
.command
,
3595 apply_error_callback_arg
.remote_xid
,
3597 MemoryContextSwitchTo(ecxt
);
3608 * Is current process a logical replication worker?
3611 IsLogicalWorker(void)
3613 return MyLogicalRepWorker
!= NULL
;
3616 /* Error callback to give more context info about the change being applied */
3618 apply_error_callback(void *arg
)
3621 ApplyErrorCallbackArg
*errarg
= &apply_error_callback_arg
;
3623 if (apply_error_callback_arg
.command
== 0)
3626 initStringInfo(&buf
);
3627 appendStringInfo(&buf
, _("processing remote data during \"%s\""),
3628 logicalrep_message_type(errarg
->command
));
3630 /* append relation information */
3633 appendStringInfo(&buf
, _(" for replication target relation \"%s.%s\""),
3634 errarg
->rel
->remoterel
.nspname
,
3635 errarg
->rel
->remoterel
.relname
);
3636 if (errarg
->remote_attnum
>= 0)
3637 appendStringInfo(&buf
, _(" column \"%s\""),
3638 errarg
->rel
->remoterel
.attnames
[errarg
->remote_attnum
]);
3641 /* append transaction information */
3642 if (TransactionIdIsNormal(errarg
->remote_xid
))
3644 appendStringInfo(&buf
, _(" in transaction %u"), errarg
->remote_xid
);
3645 if (errarg
->ts
!= 0)
3646 appendStringInfo(&buf
, _(" at %s"),
3647 timestamptz_to_str(errarg
->ts
));
3650 errcontext("%s", buf
.data
);
3654 /* Set transaction information of apply error callback */
3656 set_apply_error_context_xact(TransactionId xid
, TimestampTz ts
)
3658 apply_error_callback_arg
.remote_xid
= xid
;
3659 apply_error_callback_arg
.ts
= ts
;
3662 /* Reset all information of apply error callback */
3664 reset_apply_error_context_info(void)
3666 apply_error_callback_arg
.command
= 0;
3667 apply_error_callback_arg
.rel
= NULL
;
3668 apply_error_callback_arg
.remote_attnum
= -1;
3669 set_apply_error_context_xact(InvalidTransactionId
, 0);