Update copyright for 2022
[pgsql.git] / src / backend / replication / logical / worker.c
blob6a6d152fbfb45a281df54d37b891dc388098a27f
1 /*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
5 * Copyright (c) 2016-2022, PostgreSQL Global Development Group
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
22 * STREAMED TRANSACTIONS
23 * ---------------------
24 * Streamed transactions (large transactions exceeding a memory limit on the
25 * upstream) are not applied immediately, but instead, the data is written
26 * to temporary files and then applied at once when the final commit arrives.
28 * Unlike the regular (non-streamed) case, handling streamed transactions has
29 * to handle aborts of both the toplevel transaction and subtransactions. This
30 * is achieved by tracking offsets for subtransactions, which is then used
31 * to truncate the file with serialized changes.
33 * The files are placed in tmp file directory by default, and the filenames
34 * include both the XID of the toplevel transaction and OID of the
35 * subscription. This is necessary so that different workers processing a
36 * remote transaction with the same XID doesn't interfere.
38 * We use BufFiles instead of using normal temporary files because (a) the
39 * BufFile infrastructure supports temporary files that exceed the OS file size
40 * limit, (b) provides a way for automatic clean up on the error and (c) provides
41 * a way to survive these files across local transactions and allow to open and
42 * close at stream start and close. We decided to use FileSet
43 * infrastructure as without that it deletes the files on the closure of the
44 * file and if we decide to keep stream files open across the start/stop stream
45 * then it will consume a lot of memory (more than 8K for each BufFile and
46 * there could be multiple such BufFiles as the subscriber could receive
47 * multiple start/stop streams for different transactions before getting the
48 * commit). Moreover, if we don't use FileSet then we also need to invent
49 * a new way to pass filenames to BufFile APIs so that we are allowed to open
50 * the file we desired across multiple stream-open calls for the same
51 * transaction.
53 * TWO_PHASE TRANSACTIONS
54 * ----------------------
55 * Two phase transactions are replayed at prepare and then committed or
56 * rolled back at commit prepared and rollback prepared respectively. It is
57 * possible to have a prepared transaction that arrives at the apply worker
58 * when the tablesync is busy doing the initial copy. In this case, the apply
59 * worker skips all the prepared operations [e.g. inserts] while the tablesync
60 * is still busy (see the condition of should_apply_changes_for_rel). The
61 * tablesync worker might not get such a prepared transaction because say it
62 * was prior to the initial consistent point but might have got some later
63 * commits. Now, the tablesync worker will exit without doing anything for the
64 * prepared transaction skipped by the apply worker as the sync location for it
65 * will be already ahead of the apply worker's current location. This would lead
66 * to an "empty prepare", because later when the apply worker does the commit
67 * prepare, there is nothing in it (the inserts were skipped earlier).
69 * To avoid this, and similar prepare confusions the subscription's two_phase
70 * commit is enabled only after the initial sync is over. The two_phase option
71 * has been implemented as a tri-state with values DISABLED, PENDING, and
72 * ENABLED.
74 * Even if the user specifies they want a subscription with two_phase = on,
75 * internally it will start with a tri-state of PENDING which only becomes
76 * ENABLED after all tablesync initializations are completed - i.e. when all
77 * tablesync workers have reached their READY state. In other words, the value
78 * PENDING is only a temporary state for subscription start-up.
80 * Until the two_phase is properly available (ENABLED) the subscription will
81 * behave as if two_phase = off. When the apply worker detects that all
82 * tablesyncs have become READY (while the tri-state was PENDING) it will
83 * restart the apply worker process. This happens in
84 * process_syncing_tables_for_apply.
86 * When the (re-started) apply worker finds that all tablesyncs are READY for a
87 * two_phase tri-state of PENDING it start streaming messages with the
88 * two_phase option which in turn enables the decoding of two-phase commits at
89 * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
90 * Now, it is possible that during the time we have not enabled two_phase, the
91 * publisher (replication server) would have skipped some prepares but we
92 * ensure that such prepares are sent along with commit prepare, see
93 * ReorderBufferFinishPrepared.
95 * If the subscription has no tables then a two_phase tri-state PENDING is
96 * left unchanged. This lets the user still do an ALTER TABLE REFRESH
97 * PUBLICATION which might otherwise be disallowed (see below).
99 * If ever a user needs to be aware of the tri-state value, they can fetch it
100 * from the pg_subscription catalog (see column subtwophasestate).
102 * We don't allow to toggle two_phase option of a subscription because it can
103 * lead to an inconsistent replica. Consider, initially, it was on and we have
104 * received some prepare then we turn it off, now at commit time the server
105 * will send the entire transaction data along with the commit. With some more
106 * analysis, we can allow changing this option from off to on but not sure if
107 * that alone would be useful.
109 * Finally, to avoid problems mentioned in previous paragraphs from any
110 * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
111 * to 'off' and then again back to 'on') there is a restriction for
112 * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
113 * the two_phase tri-state is ENABLED, except when copy_data = false.
115 * We can get prepare of the same GID more than once for the genuine cases
116 * where we have defined multiple subscriptions for publications on the same
117 * server and prepared transaction has operations on tables subscribed to those
118 * subscriptions. For such cases, if we use the GID sent by publisher one of
119 * the prepares will be successful and others will fail, in which case the
120 * server will send them again. Now, this can lead to a deadlock if user has
121 * set synchronous_standby_names for all the subscriptions on subscriber. To
122 * avoid such deadlocks, we generate a unique GID (consisting of the
123 * subscription oid and the xid of the prepared transaction) for each prepare
124 * transaction on the subscriber.
125 *-------------------------------------------------------------------------
128 #include "postgres.h"
130 #include <sys/stat.h>
131 #include <unistd.h>
133 #include "access/table.h"
134 #include "access/tableam.h"
135 #include "access/twophase.h"
136 #include "access/xact.h"
137 #include "access/xlog_internal.h"
138 #include "catalog/catalog.h"
139 #include "catalog/namespace.h"
140 #include "catalog/partition.h"
141 #include "catalog/pg_inherits.h"
142 #include "catalog/pg_subscription.h"
143 #include "catalog/pg_subscription_rel.h"
144 #include "catalog/pg_tablespace.h"
145 #include "commands/tablecmds.h"
146 #include "commands/tablespace.h"
147 #include "commands/trigger.h"
148 #include "executor/executor.h"
149 #include "executor/execPartition.h"
150 #include "executor/nodeModifyTable.h"
151 #include "funcapi.h"
152 #include "libpq/pqformat.h"
153 #include "libpq/pqsignal.h"
154 #include "mb/pg_wchar.h"
155 #include "miscadmin.h"
156 #include "nodes/makefuncs.h"
157 #include "optimizer/optimizer.h"
158 #include "pgstat.h"
159 #include "postmaster/bgworker.h"
160 #include "postmaster/interrupt.h"
161 #include "postmaster/postmaster.h"
162 #include "postmaster/walwriter.h"
163 #include "replication/decode.h"
164 #include "replication/logical.h"
165 #include "replication/logicalproto.h"
166 #include "replication/logicalrelation.h"
167 #include "replication/logicalworker.h"
168 #include "replication/origin.h"
169 #include "replication/reorderbuffer.h"
170 #include "replication/snapbuild.h"
171 #include "replication/walreceiver.h"
172 #include "replication/worker_internal.h"
173 #include "rewrite/rewriteHandler.h"
174 #include "storage/buffile.h"
175 #include "storage/bufmgr.h"
176 #include "storage/fd.h"
177 #include "storage/ipc.h"
178 #include "storage/lmgr.h"
179 #include "storage/proc.h"
180 #include "storage/procarray.h"
181 #include "tcop/tcopprot.h"
182 #include "utils/builtins.h"
183 #include "utils/catcache.h"
184 #include "utils/dynahash.h"
185 #include "utils/datum.h"
186 #include "utils/fmgroids.h"
187 #include "utils/guc.h"
188 #include "utils/inval.h"
189 #include "utils/lsyscache.h"
190 #include "utils/memutils.h"
191 #include "utils/rel.h"
192 #include "utils/syscache.h"
193 #include "utils/timeout.h"
195 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
197 typedef struct FlushPosition
199 dlist_node node;
200 XLogRecPtr local_end;
201 XLogRecPtr remote_end;
202 } FlushPosition;
204 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
206 typedef struct ApplyExecutionData
208 EState *estate; /* executor state, used to track resources */
210 LogicalRepRelMapEntry *targetRel; /* replication target rel */
211 ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
213 /* These fields are used when the target relation is partitioned: */
214 ModifyTableState *mtstate; /* dummy ModifyTable state */
215 PartitionTupleRouting *proute; /* partition routing info */
216 } ApplyExecutionData;
218 /* Struct for saving and restoring apply errcontext information */
219 typedef struct ApplyErrorCallbackArg
221 LogicalRepMsgType command; /* 0 if invalid */
222 LogicalRepRelMapEntry *rel;
224 /* Remote node information */
225 int remote_attnum; /* -1 if invalid */
226 TransactionId remote_xid;
227 TimestampTz ts; /* commit, rollback, or prepare timestamp */
228 } ApplyErrorCallbackArg;
230 static ApplyErrorCallbackArg apply_error_callback_arg =
232 .command = 0,
233 .rel = NULL,
234 .remote_attnum = -1,
235 .remote_xid = InvalidTransactionId,
236 .ts = 0,
239 static MemoryContext ApplyMessageContext = NULL;
240 MemoryContext ApplyContext = NULL;
242 /* per stream context for streaming transactions */
243 static MemoryContext LogicalStreamingContext = NULL;
245 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
247 Subscription *MySubscription = NULL;
248 bool MySubscriptionValid = false;
250 bool in_remote_transaction = false;
251 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
253 /* fields valid only when processing streamed transaction */
254 static bool in_streamed_transaction = false;
256 static TransactionId stream_xid = InvalidTransactionId;
258 /* BufFile handle of the current streaming file */
259 static BufFile *stream_fd = NULL;
261 typedef struct SubXactInfo
263 TransactionId xid; /* XID of the subxact */
264 int fileno; /* file number in the buffile */
265 off_t offset; /* offset in the file */
266 } SubXactInfo;
268 /* Sub-transaction data for the current streaming transaction */
269 typedef struct ApplySubXactData
271 uint32 nsubxacts; /* number of sub-transactions */
272 uint32 nsubxacts_max; /* current capacity of subxacts */
273 TransactionId subxact_last; /* xid of the last sub-transaction */
274 SubXactInfo *subxacts; /* sub-xact offset in changes file */
275 } ApplySubXactData;
277 static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
279 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
280 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
283 * Information about subtransactions of a given toplevel transaction.
285 static void subxact_info_write(Oid subid, TransactionId xid);
286 static void subxact_info_read(Oid subid, TransactionId xid);
287 static void subxact_info_add(TransactionId xid);
288 static inline void cleanup_subxact_info(void);
291 * Serialize and deserialize changes for a toplevel transaction.
293 static void stream_cleanup_files(Oid subid, TransactionId xid);
294 static void stream_open_file(Oid subid, TransactionId xid, bool first);
295 static void stream_write_change(char action, StringInfo s);
296 static void stream_close_file(void);
298 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
300 static void store_flush_position(XLogRecPtr remote_lsn);
302 static void maybe_reread_subscription(void);
304 /* prototype needed because of stream_commit */
305 static void apply_dispatch(StringInfo s);
307 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
308 static void apply_handle_insert_internal(ApplyExecutionData *edata,
309 ResultRelInfo *relinfo,
310 TupleTableSlot *remoteslot);
311 static void apply_handle_update_internal(ApplyExecutionData *edata,
312 ResultRelInfo *relinfo,
313 TupleTableSlot *remoteslot,
314 LogicalRepTupleData *newtup);
315 static void apply_handle_delete_internal(ApplyExecutionData *edata,
316 ResultRelInfo *relinfo,
317 TupleTableSlot *remoteslot);
318 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
319 LogicalRepRelation *remoterel,
320 TupleTableSlot *remoteslot,
321 TupleTableSlot **localslot);
322 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
323 TupleTableSlot *remoteslot,
324 LogicalRepTupleData *newtup,
325 CmdType operation);
327 /* Compute GID for two_phase transactions */
328 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
330 /* Common streaming function to apply all the spooled messages */
331 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
333 /* Functions for apply error callback */
334 static void apply_error_callback(void *arg);
335 static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
336 static inline void reset_apply_error_context_info(void);
339 * Should this worker apply changes for given relation.
341 * This is mainly needed for initial relation data sync as that runs in
342 * separate worker process running in parallel and we need some way to skip
343 * changes coming to the main apply worker during the sync of a table.
345 * Note we need to do smaller or equals comparison for SYNCDONE state because
346 * it might hold position of end of initial slot consistent point WAL
347 * record + 1 (ie start of next record) and next record can be COMMIT of
348 * transaction we are now processing (which is what we set remote_final_lsn
349 * to in apply_handle_begin).
351 static bool
352 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
354 if (am_tablesync_worker())
355 return MyLogicalRepWorker->relid == rel->localreloid;
356 else
357 return (rel->state == SUBREL_STATE_READY ||
358 (rel->state == SUBREL_STATE_SYNCDONE &&
359 rel->statelsn <= remote_final_lsn));
363 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
365 * Start a transaction, if this is the first step (else we keep using the
366 * existing transaction).
367 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
369 static void
370 begin_replication_step(void)
372 SetCurrentStatementStartTimestamp();
374 if (!IsTransactionState())
376 StartTransactionCommand();
377 maybe_reread_subscription();
380 PushActiveSnapshot(GetTransactionSnapshot());
382 MemoryContextSwitchTo(ApplyMessageContext);
386 * Finish up one step of a replication transaction.
387 * Callers of begin_replication_step() must also call this.
389 * We don't close out the transaction here, but we should increment
390 * the command counter to make the effects of this step visible.
392 static void
393 end_replication_step(void)
395 PopActiveSnapshot();
397 CommandCounterIncrement();
401 * Handle streamed transactions.
403 * If in streaming mode (receiving a block of streamed transaction), we
404 * simply redirect it to a file for the proper toplevel transaction.
406 * Returns true for streamed transactions, false otherwise (regular mode).
408 static bool
409 handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
411 TransactionId xid;
413 /* not in streaming mode */
414 if (!in_streamed_transaction)
415 return false;
417 Assert(stream_fd != NULL);
418 Assert(TransactionIdIsValid(stream_xid));
421 * We should have received XID of the subxact as the first part of the
422 * message, so extract it.
424 xid = pq_getmsgint(s, 4);
426 if (!TransactionIdIsValid(xid))
427 ereport(ERROR,
428 (errcode(ERRCODE_PROTOCOL_VIOLATION),
429 errmsg_internal("invalid transaction ID in streamed replication transaction")));
431 /* Add the new subxact to the array (unless already there). */
432 subxact_info_add(xid);
434 /* write the change to the current file */
435 stream_write_change(action, s);
437 return true;
441 * Executor state preparation for evaluation of constraint expressions,
442 * indexes and triggers for the specified relation.
444 * Note that the caller must open and close any indexes to be updated.
446 static ApplyExecutionData *
447 create_edata_for_relation(LogicalRepRelMapEntry *rel)
449 ApplyExecutionData *edata;
450 EState *estate;
451 RangeTblEntry *rte;
452 ResultRelInfo *resultRelInfo;
454 edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
455 edata->targetRel = rel;
457 edata->estate = estate = CreateExecutorState();
459 rte = makeNode(RangeTblEntry);
460 rte->rtekind = RTE_RELATION;
461 rte->relid = RelationGetRelid(rel->localrel);
462 rte->relkind = rel->localrel->rd_rel->relkind;
463 rte->rellockmode = AccessShareLock;
464 ExecInitRangeTable(estate, list_make1(rte));
466 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
469 * Use Relation opened by logicalrep_rel_open() instead of opening it
470 * again.
472 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
475 * We put the ResultRelInfo in the es_opened_result_relations list, even
476 * though we don't populate the es_result_relations array. That's a bit
477 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
479 * ExecOpenIndices() is not called here either, each execution path doing
480 * an apply operation being responsible for that.
482 estate->es_opened_result_relations =
483 lappend(estate->es_opened_result_relations, resultRelInfo);
485 estate->es_output_cid = GetCurrentCommandId(true);
487 /* Prepare to catch AFTER triggers. */
488 AfterTriggerBeginQuery();
490 /* other fields of edata remain NULL for now */
492 return edata;
496 * Finish any operations related to the executor state created by
497 * create_edata_for_relation().
499 static void
500 finish_edata(ApplyExecutionData *edata)
502 EState *estate = edata->estate;
504 /* Handle any queued AFTER triggers. */
505 AfterTriggerEndQuery(estate);
507 /* Shut down tuple routing, if any was done. */
508 if (edata->proute)
509 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
512 * Cleanup. It might seem that we should call ExecCloseResultRelations()
513 * here, but we intentionally don't. It would close the rel we added to
514 * es_opened_result_relations above, which is wrong because we took no
515 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
516 * any other relations opened during execution.
518 ExecResetTupleTable(estate->es_tupleTable, false);
519 FreeExecutorState(estate);
520 pfree(edata);
524 * Executes default values for columns for which we can't map to remote
525 * relation columns.
527 * This allows us to support tables which have more columns on the downstream
528 * than on the upstream.
530 static void
531 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
532 TupleTableSlot *slot)
534 TupleDesc desc = RelationGetDescr(rel->localrel);
535 int num_phys_attrs = desc->natts;
536 int i;
537 int attnum,
538 num_defaults = 0;
539 int *defmap;
540 ExprState **defexprs;
541 ExprContext *econtext;
543 econtext = GetPerTupleExprContext(estate);
545 /* We got all the data via replication, no need to evaluate anything. */
546 if (num_phys_attrs == rel->remoterel.natts)
547 return;
549 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
550 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
552 Assert(rel->attrmap->maplen == num_phys_attrs);
553 for (attnum = 0; attnum < num_phys_attrs; attnum++)
555 Expr *defexpr;
557 if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
558 continue;
560 if (rel->attrmap->attnums[attnum] >= 0)
561 continue;
563 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
565 if (defexpr != NULL)
567 /* Run the expression through planner */
568 defexpr = expression_planner(defexpr);
570 /* Initialize executable expression in copycontext */
571 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
572 defmap[num_defaults] = attnum;
573 num_defaults++;
578 for (i = 0; i < num_defaults; i++)
579 slot->tts_values[defmap[i]] =
580 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
584 * Store tuple data into slot.
586 * Incoming data can be either text or binary format.
588 static void
589 slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
590 LogicalRepTupleData *tupleData)
592 int natts = slot->tts_tupleDescriptor->natts;
593 int i;
595 ExecClearTuple(slot);
597 /* Call the "in" function for each non-dropped, non-null attribute */
598 Assert(natts == rel->attrmap->maplen);
599 for (i = 0; i < natts; i++)
601 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
602 int remoteattnum = rel->attrmap->attnums[i];
604 if (!att->attisdropped && remoteattnum >= 0)
606 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
608 Assert(remoteattnum < tupleData->ncols);
610 /* Set attnum for error callback */
611 apply_error_callback_arg.remote_attnum = remoteattnum;
613 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
615 Oid typinput;
616 Oid typioparam;
618 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
619 slot->tts_values[i] =
620 OidInputFunctionCall(typinput, colvalue->data,
621 typioparam, att->atttypmod);
622 slot->tts_isnull[i] = false;
624 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
626 Oid typreceive;
627 Oid typioparam;
630 * In some code paths we may be asked to re-parse the same
631 * tuple data. Reset the StringInfo's cursor so that works.
633 colvalue->cursor = 0;
635 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
636 slot->tts_values[i] =
637 OidReceiveFunctionCall(typreceive, colvalue,
638 typioparam, att->atttypmod);
640 /* Trouble if it didn't eat the whole buffer */
641 if (colvalue->cursor != colvalue->len)
642 ereport(ERROR,
643 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
644 errmsg("incorrect binary data format in logical replication column %d",
645 remoteattnum + 1)));
646 slot->tts_isnull[i] = false;
648 else
651 * NULL value from remote. (We don't expect to see
652 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
653 * NULL.)
655 slot->tts_values[i] = (Datum) 0;
656 slot->tts_isnull[i] = true;
659 /* Reset attnum for error callback */
660 apply_error_callback_arg.remote_attnum = -1;
662 else
665 * We assign NULL to dropped attributes and missing values
666 * (missing values should be later filled using
667 * slot_fill_defaults).
669 slot->tts_values[i] = (Datum) 0;
670 slot->tts_isnull[i] = true;
674 ExecStoreVirtualTuple(slot);
678 * Replace updated columns with data from the LogicalRepTupleData struct.
679 * This is somewhat similar to heap_modify_tuple but also calls the type
680 * input functions on the user data.
682 * "slot" is filled with a copy of the tuple in "srcslot", replacing
683 * columns provided in "tupleData" and leaving others as-is.
685 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
686 * storage for "srcslot". This is OK for current usage, but someday we may
687 * need to materialize "slot" at the end to make it independent of "srcslot".
689 static void
690 slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
691 LogicalRepRelMapEntry *rel,
692 LogicalRepTupleData *tupleData)
694 int natts = slot->tts_tupleDescriptor->natts;
695 int i;
697 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
698 ExecClearTuple(slot);
701 * Copy all the column data from srcslot, so that we'll have valid values
702 * for unreplaced columns.
704 Assert(natts == srcslot->tts_tupleDescriptor->natts);
705 slot_getallattrs(srcslot);
706 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
707 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
709 /* Call the "in" function for each replaced attribute */
710 Assert(natts == rel->attrmap->maplen);
711 for (i = 0; i < natts; i++)
713 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
714 int remoteattnum = rel->attrmap->attnums[i];
716 if (remoteattnum < 0)
717 continue;
719 Assert(remoteattnum < tupleData->ncols);
721 if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
723 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
725 /* Set attnum for error callback */
726 apply_error_callback_arg.remote_attnum = remoteattnum;
728 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
730 Oid typinput;
731 Oid typioparam;
733 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
734 slot->tts_values[i] =
735 OidInputFunctionCall(typinput, colvalue->data,
736 typioparam, att->atttypmod);
737 slot->tts_isnull[i] = false;
739 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
741 Oid typreceive;
742 Oid typioparam;
745 * In some code paths we may be asked to re-parse the same
746 * tuple data. Reset the StringInfo's cursor so that works.
748 colvalue->cursor = 0;
750 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
751 slot->tts_values[i] =
752 OidReceiveFunctionCall(typreceive, colvalue,
753 typioparam, att->atttypmod);
755 /* Trouble if it didn't eat the whole buffer */
756 if (colvalue->cursor != colvalue->len)
757 ereport(ERROR,
758 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
759 errmsg("incorrect binary data format in logical replication column %d",
760 remoteattnum + 1)));
761 slot->tts_isnull[i] = false;
763 else
765 /* must be LOGICALREP_COLUMN_NULL */
766 slot->tts_values[i] = (Datum) 0;
767 slot->tts_isnull[i] = true;
770 /* Reset attnum for error callback */
771 apply_error_callback_arg.remote_attnum = -1;
775 /* And finally, declare that "slot" contains a valid virtual tuple */
776 ExecStoreVirtualTuple(slot);
780 * Handle BEGIN message.
782 static void
783 apply_handle_begin(StringInfo s)
785 LogicalRepBeginData begin_data;
787 logicalrep_read_begin(s, &begin_data);
788 set_apply_error_context_xact(begin_data.xid, begin_data.committime);
790 remote_final_lsn = begin_data.final_lsn;
792 in_remote_transaction = true;
794 pgstat_report_activity(STATE_RUNNING, NULL);
798 * Handle COMMIT message.
800 * TODO, support tracking of multiple origins
802 static void
803 apply_handle_commit(StringInfo s)
805 LogicalRepCommitData commit_data;
807 logicalrep_read_commit(s, &commit_data);
809 if (commit_data.commit_lsn != remote_final_lsn)
810 ereport(ERROR,
811 (errcode(ERRCODE_PROTOCOL_VIOLATION),
812 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
813 LSN_FORMAT_ARGS(commit_data.commit_lsn),
814 LSN_FORMAT_ARGS(remote_final_lsn))));
816 apply_handle_commit_internal(&commit_data);
818 /* Process any tables that are being synchronized in parallel. */
819 process_syncing_tables(commit_data.end_lsn);
821 pgstat_report_activity(STATE_IDLE, NULL);
822 reset_apply_error_context_info();
826 * Handle BEGIN PREPARE message.
828 static void
829 apply_handle_begin_prepare(StringInfo s)
831 LogicalRepPreparedTxnData begin_data;
833 /* Tablesync should never receive prepare. */
834 if (am_tablesync_worker())
835 ereport(ERROR,
836 (errcode(ERRCODE_PROTOCOL_VIOLATION),
837 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
839 logicalrep_read_begin_prepare(s, &begin_data);
840 set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
842 remote_final_lsn = begin_data.prepare_lsn;
844 in_remote_transaction = true;
846 pgstat_report_activity(STATE_RUNNING, NULL);
850 * Common function to prepare the GID.
852 static void
853 apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
855 char gid[GIDSIZE];
858 * Compute unique GID for two_phase transactions. We don't use GID of
859 * prepared transaction sent by server as that can lead to deadlock when
860 * we have multiple subscriptions from same node point to publications on
861 * the same node. See comments atop worker.c
863 TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
864 gid, sizeof(gid));
867 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
868 * called within the PrepareTransactionBlock below.
870 BeginTransactionBlock();
871 CommitTransactionCommand(); /* Completes the preceding Begin command. */
874 * Update origin state so we can restart streaming from correct position
875 * in case of crash.
877 replorigin_session_origin_lsn = prepare_data->end_lsn;
878 replorigin_session_origin_timestamp = prepare_data->prepare_time;
880 PrepareTransactionBlock(gid);
884 * Handle PREPARE message.
886 static void
887 apply_handle_prepare(StringInfo s)
889 LogicalRepPreparedTxnData prepare_data;
891 logicalrep_read_prepare(s, &prepare_data);
893 if (prepare_data.prepare_lsn != remote_final_lsn)
894 ereport(ERROR,
895 (errcode(ERRCODE_PROTOCOL_VIOLATION),
896 errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
897 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
898 LSN_FORMAT_ARGS(remote_final_lsn))));
901 * Unlike commit, here, we always prepare the transaction even though no
902 * change has happened in this transaction. It is done this way because at
903 * commit prepared time, we won't know whether we have skipped preparing a
904 * transaction because of no change.
906 * XXX, We can optimize such that at commit prepared time, we first check
907 * whether we have prepared the transaction or not but that doesn't seem
908 * worthwhile because such cases shouldn't be common.
910 begin_replication_step();
912 apply_handle_prepare_internal(&prepare_data);
914 end_replication_step();
915 CommitTransactionCommand();
916 pgstat_report_stat(false);
918 store_flush_position(prepare_data.end_lsn);
920 in_remote_transaction = false;
922 /* Process any tables that are being synchronized in parallel. */
923 process_syncing_tables(prepare_data.end_lsn);
925 pgstat_report_activity(STATE_IDLE, NULL);
926 reset_apply_error_context_info();
930 * Handle a COMMIT PREPARED of a previously PREPARED transaction.
932 static void
933 apply_handle_commit_prepared(StringInfo s)
935 LogicalRepCommitPreparedTxnData prepare_data;
936 char gid[GIDSIZE];
938 logicalrep_read_commit_prepared(s, &prepare_data);
939 set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
941 /* Compute GID for two_phase transactions. */
942 TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
943 gid, sizeof(gid));
945 /* There is no transaction when COMMIT PREPARED is called */
946 begin_replication_step();
949 * Update origin state so we can restart streaming from correct position
950 * in case of crash.
952 replorigin_session_origin_lsn = prepare_data.end_lsn;
953 replorigin_session_origin_timestamp = prepare_data.commit_time;
955 FinishPreparedTransaction(gid, true);
956 end_replication_step();
957 CommitTransactionCommand();
958 pgstat_report_stat(false);
960 store_flush_position(prepare_data.end_lsn);
961 in_remote_transaction = false;
963 /* Process any tables that are being synchronized in parallel. */
964 process_syncing_tables(prepare_data.end_lsn);
966 pgstat_report_activity(STATE_IDLE, NULL);
967 reset_apply_error_context_info();
971 * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
973 static void
974 apply_handle_rollback_prepared(StringInfo s)
976 LogicalRepRollbackPreparedTxnData rollback_data;
977 char gid[GIDSIZE];
979 logicalrep_read_rollback_prepared(s, &rollback_data);
980 set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
982 /* Compute GID for two_phase transactions. */
983 TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
984 gid, sizeof(gid));
987 * It is possible that we haven't received prepare because it occurred
988 * before walsender reached a consistent point or the two_phase was still
989 * not enabled by that time, so in such cases, we need to skip rollback
990 * prepared.
992 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
993 rollback_data.prepare_time))
996 * Update origin state so we can restart streaming from correct
997 * position in case of crash.
999 replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1000 replorigin_session_origin_timestamp = rollback_data.rollback_time;
1002 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1003 begin_replication_step();
1004 FinishPreparedTransaction(gid, false);
1005 end_replication_step();
1006 CommitTransactionCommand();
1009 pgstat_report_stat(false);
1011 store_flush_position(rollback_data.rollback_end_lsn);
1012 in_remote_transaction = false;
1014 /* Process any tables that are being synchronized in parallel. */
1015 process_syncing_tables(rollback_data.rollback_end_lsn);
1017 pgstat_report_activity(STATE_IDLE, NULL);
1018 reset_apply_error_context_info();
1022 * Handle STREAM PREPARE.
1024 * Logic is in two parts:
1025 * 1. Replay all the spooled operations
1026 * 2. Mark the transaction as prepared
1028 static void
1029 apply_handle_stream_prepare(StringInfo s)
1031 LogicalRepPreparedTxnData prepare_data;
1033 if (in_streamed_transaction)
1034 ereport(ERROR,
1035 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1036 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1038 /* Tablesync should never receive prepare. */
1039 if (am_tablesync_worker())
1040 ereport(ERROR,
1041 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1042 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1044 logicalrep_read_stream_prepare(s, &prepare_data);
1045 set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
1047 elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
1049 /* Replay all the spooled operations. */
1050 apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
1052 /* Mark the transaction as prepared. */
1053 apply_handle_prepare_internal(&prepare_data);
1055 CommitTransactionCommand();
1057 pgstat_report_stat(false);
1059 store_flush_position(prepare_data.end_lsn);
1061 in_remote_transaction = false;
1063 /* unlink the files with serialized changes and subxact info. */
1064 stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1066 /* Process any tables that are being synchronized in parallel. */
1067 process_syncing_tables(prepare_data.end_lsn);
1069 pgstat_report_activity(STATE_IDLE, NULL);
1071 reset_apply_error_context_info();
1075 * Handle ORIGIN message.
1077 * TODO, support tracking of multiple origins
1079 static void
1080 apply_handle_origin(StringInfo s)
1083 * ORIGIN message can only come inside streaming transaction or inside
1084 * remote transaction and before any actual writes.
1086 if (!in_streamed_transaction &&
1087 (!in_remote_transaction ||
1088 (IsTransactionState() && !am_tablesync_worker())))
1089 ereport(ERROR,
1090 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1091 errmsg_internal("ORIGIN message sent out of order")));
1095 * Handle STREAM START message.
1097 static void
1098 apply_handle_stream_start(StringInfo s)
1100 bool first_segment;
1102 if (in_streamed_transaction)
1103 ereport(ERROR,
1104 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1105 errmsg_internal("duplicate STREAM START message")));
1108 * Start a transaction on stream start, this transaction will be committed
1109 * on the stream stop unless it is a tablesync worker in which case it
1110 * will be committed after processing all the messages. We need the
1111 * transaction for handling the buffile, used for serializing the
1112 * streaming data and subxact info.
1114 begin_replication_step();
1116 /* notify handle methods we're processing a remote transaction */
1117 in_streamed_transaction = true;
1119 /* extract XID of the top-level transaction */
1120 stream_xid = logicalrep_read_stream_start(s, &first_segment);
1122 if (!TransactionIdIsValid(stream_xid))
1123 ereport(ERROR,
1124 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1125 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1127 set_apply_error_context_xact(stream_xid, 0);
1130 * Initialize the worker's stream_fileset if we haven't yet. This will be
1131 * used for the entire duration of the worker so create it in a permanent
1132 * context. We create this on the very first streaming message from any
1133 * transaction and then use it for this and other streaming transactions.
1134 * Now, we could create a fileset at the start of the worker as well but
1135 * then we won't be sure that it will ever be used.
1137 if (MyLogicalRepWorker->stream_fileset == NULL)
1139 MemoryContext oldctx;
1141 oldctx = MemoryContextSwitchTo(ApplyContext);
1143 MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1144 FileSetInit(MyLogicalRepWorker->stream_fileset);
1146 MemoryContextSwitchTo(oldctx);
1149 /* open the spool file for this transaction */
1150 stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
1152 /* if this is not the first segment, open existing subxact file */
1153 if (!first_segment)
1154 subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
1156 pgstat_report_activity(STATE_RUNNING, NULL);
1158 end_replication_step();
1162 * Handle STREAM STOP message.
1164 static void
1165 apply_handle_stream_stop(StringInfo s)
1167 if (!in_streamed_transaction)
1168 ereport(ERROR,
1169 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1170 errmsg_internal("STREAM STOP message without STREAM START")));
1173 * Close the file with serialized changes, and serialize information about
1174 * subxacts for the toplevel transaction.
1176 subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
1177 stream_close_file();
1179 /* We must be in a valid transaction state */
1180 Assert(IsTransactionState());
1182 /* Commit the per-stream transaction */
1183 CommitTransactionCommand();
1185 in_streamed_transaction = false;
1187 /* Reset per-stream context */
1188 MemoryContextReset(LogicalStreamingContext);
1190 pgstat_report_activity(STATE_IDLE, NULL);
1191 reset_apply_error_context_info();
1195 * Handle STREAM abort message.
1197 static void
1198 apply_handle_stream_abort(StringInfo s)
1200 TransactionId xid;
1201 TransactionId subxid;
1203 if (in_streamed_transaction)
1204 ereport(ERROR,
1205 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1206 errmsg_internal("STREAM ABORT message without STREAM STOP")));
1208 logicalrep_read_stream_abort(s, &xid, &subxid);
1211 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1212 * just delete the files with serialized info.
1214 if (xid == subxid)
1216 set_apply_error_context_xact(xid, 0);
1217 stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1219 else
1222 * OK, so it's a subxact. We need to read the subxact file for the
1223 * toplevel transaction, determine the offset tracked for the subxact,
1224 * and truncate the file with changes. We also remove the subxacts
1225 * with higher offsets (or rather higher XIDs).
1227 * We intentionally scan the array from the tail, because we're likely
1228 * aborting a change for the most recent subtransactions.
1230 * We can't use the binary search here as subxact XIDs won't
1231 * necessarily arrive in sorted order, consider the case where we have
1232 * released the savepoint for multiple subtransactions and then
1233 * performed rollback to savepoint for one of the earlier
1234 * sub-transaction.
1236 int64 i;
1237 int64 subidx;
1238 BufFile *fd;
1239 bool found = false;
1240 char path[MAXPGPATH];
1242 set_apply_error_context_xact(subxid, 0);
1244 subidx = -1;
1245 begin_replication_step();
1246 subxact_info_read(MyLogicalRepWorker->subid, xid);
1248 for (i = subxact_data.nsubxacts; i > 0; i--)
1250 if (subxact_data.subxacts[i - 1].xid == subxid)
1252 subidx = (i - 1);
1253 found = true;
1254 break;
1259 * If it's an empty sub-transaction then we will not find the subxid
1260 * here so just cleanup the subxact info and return.
1262 if (!found)
1264 /* Cleanup the subxact info */
1265 cleanup_subxact_info();
1266 end_replication_step();
1267 CommitTransactionCommand();
1268 reset_apply_error_context_info();
1269 return;
1272 /* open the changes file */
1273 changes_filename(path, MyLogicalRepWorker->subid, xid);
1274 fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
1275 O_RDWR, false);
1277 /* OK, truncate the file at the right offset */
1278 BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
1279 subxact_data.subxacts[subidx].offset);
1280 BufFileClose(fd);
1282 /* discard the subxacts added later */
1283 subxact_data.nsubxacts = subidx;
1285 /* write the updated subxact list */
1286 subxact_info_write(MyLogicalRepWorker->subid, xid);
1288 end_replication_step();
1289 CommitTransactionCommand();
1292 reset_apply_error_context_info();
1296 * Common spoolfile processing.
1298 static void
1299 apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
1301 StringInfoData s2;
1302 int nchanges;
1303 char path[MAXPGPATH];
1304 char *buffer = NULL;
1305 MemoryContext oldcxt;
1306 BufFile *fd;
1308 /* Make sure we have an open transaction */
1309 begin_replication_step();
1312 * Allocate file handle and memory required to process all the messages in
1313 * TopTransactionContext to avoid them getting reset after each message is
1314 * processed.
1316 oldcxt = MemoryContextSwitchTo(TopTransactionContext);
1318 /* Open the spool file for the committed/prepared transaction */
1319 changes_filename(path, MyLogicalRepWorker->subid, xid);
1320 elog(DEBUG1, "replaying changes from file \"%s\"", path);
1322 fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
1323 false);
1325 buffer = palloc(BLCKSZ);
1326 initStringInfo(&s2);
1328 MemoryContextSwitchTo(oldcxt);
1330 remote_final_lsn = lsn;
1333 * Make sure the handle apply_dispatch methods are aware we're in a remote
1334 * transaction.
1336 in_remote_transaction = true;
1337 pgstat_report_activity(STATE_RUNNING, NULL);
1339 end_replication_step();
1342 * Read the entries one by one and pass them through the same logic as in
1343 * apply_dispatch.
1345 nchanges = 0;
1346 while (true)
1348 int nbytes;
1349 int len;
1351 CHECK_FOR_INTERRUPTS();
1353 /* read length of the on-disk record */
1354 nbytes = BufFileRead(fd, &len, sizeof(len));
1356 /* have we reached end of the file? */
1357 if (nbytes == 0)
1358 break;
1360 /* do we have a correct length? */
1361 if (nbytes != sizeof(len))
1362 ereport(ERROR,
1363 (errcode_for_file_access(),
1364 errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1365 path)));
1367 if (len <= 0)
1368 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
1369 len, path);
1371 /* make sure we have sufficiently large buffer */
1372 buffer = repalloc(buffer, len);
1374 /* and finally read the data into the buffer */
1375 if (BufFileRead(fd, buffer, len) != len)
1376 ereport(ERROR,
1377 (errcode_for_file_access(),
1378 errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1379 path)));
1381 /* copy the buffer to the stringinfo and call apply_dispatch */
1382 resetStringInfo(&s2);
1383 appendBinaryStringInfo(&s2, buffer, len);
1385 /* Ensure we are reading the data into our memory context. */
1386 oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
1388 apply_dispatch(&s2);
1390 MemoryContextReset(ApplyMessageContext);
1392 MemoryContextSwitchTo(oldcxt);
1394 nchanges++;
1396 if (nchanges % 1000 == 0)
1397 elog(DEBUG1, "replayed %d changes from file \"%s\"",
1398 nchanges, path);
1401 BufFileClose(fd);
1403 pfree(buffer);
1404 pfree(s2.data);
1406 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1407 nchanges, path);
1409 return;
1413 * Handle STREAM COMMIT message.
1415 static void
1416 apply_handle_stream_commit(StringInfo s)
1418 TransactionId xid;
1419 LogicalRepCommitData commit_data;
1421 if (in_streamed_transaction)
1422 ereport(ERROR,
1423 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1424 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1426 xid = logicalrep_read_stream_commit(s, &commit_data);
1427 set_apply_error_context_xact(xid, commit_data.committime);
1429 elog(DEBUG1, "received commit for streamed transaction %u", xid);
1431 apply_spooled_messages(xid, commit_data.commit_lsn);
1433 apply_handle_commit_internal(&commit_data);
1435 /* unlink the files with serialized changes and subxact info */
1436 stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1438 /* Process any tables that are being synchronized in parallel. */
1439 process_syncing_tables(commit_data.end_lsn);
1441 pgstat_report_activity(STATE_IDLE, NULL);
1443 reset_apply_error_context_info();
1447 * Helper function for apply_handle_commit and apply_handle_stream_commit.
1449 static void
1450 apply_handle_commit_internal(LogicalRepCommitData *commit_data)
1452 if (IsTransactionState())
1455 * Update origin state so we can restart streaming from correct
1456 * position in case of crash.
1458 replorigin_session_origin_lsn = commit_data->end_lsn;
1459 replorigin_session_origin_timestamp = commit_data->committime;
1461 CommitTransactionCommand();
1462 pgstat_report_stat(false);
1464 store_flush_position(commit_data->end_lsn);
1466 else
1468 /* Process any invalidation messages that might have accumulated. */
1469 AcceptInvalidationMessages();
1470 maybe_reread_subscription();
1473 in_remote_transaction = false;
1477 * Handle RELATION message.
1479 * Note we don't do validation against local schema here. The validation
1480 * against local schema is postponed until first change for given relation
1481 * comes as we only care about it when applying changes for it anyway and we
1482 * do less locking this way.
1484 static void
1485 apply_handle_relation(StringInfo s)
1487 LogicalRepRelation *rel;
1489 if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
1490 return;
1492 rel = logicalrep_read_rel(s);
1493 logicalrep_relmap_update(rel);
1497 * Handle TYPE message.
1499 * This implementation pays no attention to TYPE messages; we expect the user
1500 * to have set things up so that the incoming data is acceptable to the input
1501 * functions for the locally subscribed tables. Hence, we just read and
1502 * discard the message.
1504 static void
1505 apply_handle_type(StringInfo s)
1507 LogicalRepTyp typ;
1509 if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
1510 return;
1512 logicalrep_read_typ(s, &typ);
1516 * Get replica identity index or if it is not defined a primary key.
1518 * If neither is defined, returns InvalidOid
1520 static Oid
1521 GetRelationIdentityOrPK(Relation rel)
1523 Oid idxoid;
1525 idxoid = RelationGetReplicaIndex(rel);
1527 if (!OidIsValid(idxoid))
1528 idxoid = RelationGetPrimaryKeyIndex(rel);
1530 return idxoid;
1534 * Handle INSERT message.
1537 static void
1538 apply_handle_insert(StringInfo s)
1540 LogicalRepRelMapEntry *rel;
1541 LogicalRepTupleData newtup;
1542 LogicalRepRelId relid;
1543 ApplyExecutionData *edata;
1544 EState *estate;
1545 TupleTableSlot *remoteslot;
1546 MemoryContext oldctx;
1548 if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
1549 return;
1551 begin_replication_step();
1553 relid = logicalrep_read_insert(s, &newtup);
1554 rel = logicalrep_rel_open(relid, RowExclusiveLock);
1555 if (!should_apply_changes_for_rel(rel))
1558 * The relation can't become interesting in the middle of the
1559 * transaction so it's safe to unlock it.
1561 logicalrep_rel_close(rel, RowExclusiveLock);
1562 end_replication_step();
1563 return;
1566 /* Set relation for error callback */
1567 apply_error_callback_arg.rel = rel;
1569 /* Initialize the executor state. */
1570 edata = create_edata_for_relation(rel);
1571 estate = edata->estate;
1572 remoteslot = ExecInitExtraTupleSlot(estate,
1573 RelationGetDescr(rel->localrel),
1574 &TTSOpsVirtual);
1576 /* Process and store remote tuple in the slot */
1577 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1578 slot_store_data(remoteslot, rel, &newtup);
1579 slot_fill_defaults(rel, estate, remoteslot);
1580 MemoryContextSwitchTo(oldctx);
1582 /* For a partitioned table, insert the tuple into a partition. */
1583 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1584 apply_handle_tuple_routing(edata,
1585 remoteslot, NULL, CMD_INSERT);
1586 else
1587 apply_handle_insert_internal(edata, edata->targetRelInfo,
1588 remoteslot);
1590 finish_edata(edata);
1592 /* Reset relation for error callback */
1593 apply_error_callback_arg.rel = NULL;
1595 logicalrep_rel_close(rel, NoLock);
1597 end_replication_step();
1601 * Workhorse for apply_handle_insert()
1602 * relinfo is for the relation we're actually inserting into
1603 * (could be a child partition of edata->targetRelInfo)
1605 static void
1606 apply_handle_insert_internal(ApplyExecutionData *edata,
1607 ResultRelInfo *relinfo,
1608 TupleTableSlot *remoteslot)
1610 EState *estate = edata->estate;
1612 /* We must open indexes here. */
1613 ExecOpenIndices(relinfo, false);
1615 /* Do the insert. */
1616 ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1618 /* Cleanup. */
1619 ExecCloseIndices(relinfo);
1623 * Check if the logical replication relation is updatable and throw
1624 * appropriate error if it isn't.
1626 static void
1627 check_relation_updatable(LogicalRepRelMapEntry *rel)
1629 /* Updatable, no error. */
1630 if (rel->updatable)
1631 return;
1634 * We are in error mode so it's fine this is somewhat slow. It's better to
1635 * give user correct error.
1637 if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
1639 ereport(ERROR,
1640 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1641 errmsg("publisher did not send replica identity column "
1642 "expected by the logical replication target relation \"%s.%s\"",
1643 rel->remoterel.nspname, rel->remoterel.relname)));
1646 ereport(ERROR,
1647 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1648 errmsg("logical replication target relation \"%s.%s\" has "
1649 "neither REPLICA IDENTITY index nor PRIMARY "
1650 "KEY and published relation does not have "
1651 "REPLICA IDENTITY FULL",
1652 rel->remoterel.nspname, rel->remoterel.relname)));
1656 * Handle UPDATE message.
1658 * TODO: FDW support
1660 static void
1661 apply_handle_update(StringInfo s)
1663 LogicalRepRelMapEntry *rel;
1664 LogicalRepRelId relid;
1665 ApplyExecutionData *edata;
1666 EState *estate;
1667 LogicalRepTupleData oldtup;
1668 LogicalRepTupleData newtup;
1669 bool has_oldtup;
1670 TupleTableSlot *remoteslot;
1671 RangeTblEntry *target_rte;
1672 MemoryContext oldctx;
1674 if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
1675 return;
1677 begin_replication_step();
1679 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1680 &newtup);
1681 rel = logicalrep_rel_open(relid, RowExclusiveLock);
1682 if (!should_apply_changes_for_rel(rel))
1685 * The relation can't become interesting in the middle of the
1686 * transaction so it's safe to unlock it.
1688 logicalrep_rel_close(rel, RowExclusiveLock);
1689 end_replication_step();
1690 return;
1693 /* Set relation for error callback */
1694 apply_error_callback_arg.rel = rel;
1696 /* Check if we can do the update. */
1697 check_relation_updatable(rel);
1699 /* Initialize the executor state. */
1700 edata = create_edata_for_relation(rel);
1701 estate = edata->estate;
1702 remoteslot = ExecInitExtraTupleSlot(estate,
1703 RelationGetDescr(rel->localrel),
1704 &TTSOpsVirtual);
1707 * Populate updatedCols so that per-column triggers can fire, and so
1708 * executor can correctly pass down indexUnchanged hint. This could
1709 * include more columns than were actually changed on the publisher
1710 * because the logical replication protocol doesn't contain that
1711 * information. But it would for example exclude columns that only exist
1712 * on the subscriber, since we are not touching those.
1714 target_rte = list_nth(estate->es_range_table, 0);
1715 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1717 Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
1718 int remoteattnum = rel->attrmap->attnums[i];
1720 if (!att->attisdropped && remoteattnum >= 0)
1722 Assert(remoteattnum < newtup.ncols);
1723 if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1724 target_rte->updatedCols =
1725 bms_add_member(target_rte->updatedCols,
1726 i + 1 - FirstLowInvalidHeapAttributeNumber);
1730 /* Also populate extraUpdatedCols, in case we have generated columns */
1731 fill_extraUpdatedCols(target_rte, rel->localrel);
1733 /* Build the search tuple. */
1734 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1735 slot_store_data(remoteslot, rel,
1736 has_oldtup ? &oldtup : &newtup);
1737 MemoryContextSwitchTo(oldctx);
1739 /* For a partitioned table, apply update to correct partition. */
1740 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1741 apply_handle_tuple_routing(edata,
1742 remoteslot, &newtup, CMD_UPDATE);
1743 else
1744 apply_handle_update_internal(edata, edata->targetRelInfo,
1745 remoteslot, &newtup);
1747 finish_edata(edata);
1749 /* Reset relation for error callback */
1750 apply_error_callback_arg.rel = NULL;
1752 logicalrep_rel_close(rel, NoLock);
1754 end_replication_step();
1758 * Workhorse for apply_handle_update()
1759 * relinfo is for the relation we're actually updating in
1760 * (could be a child partition of edata->targetRelInfo)
1762 static void
1763 apply_handle_update_internal(ApplyExecutionData *edata,
1764 ResultRelInfo *relinfo,
1765 TupleTableSlot *remoteslot,
1766 LogicalRepTupleData *newtup)
1768 EState *estate = edata->estate;
1769 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1770 Relation localrel = relinfo->ri_RelationDesc;
1771 EPQState epqstate;
1772 TupleTableSlot *localslot;
1773 bool found;
1774 MemoryContext oldctx;
1776 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1777 ExecOpenIndices(relinfo, false);
1779 found = FindReplTupleInLocalRel(estate, localrel,
1780 &relmapentry->remoterel,
1781 remoteslot, &localslot);
1782 ExecClearTuple(remoteslot);
1785 * Tuple found.
1787 * Note this will fail if there are other conflicting unique indexes.
1789 if (found)
1791 /* Process and store remote tuple in the slot */
1792 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1793 slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1794 MemoryContextSwitchTo(oldctx);
1796 EvalPlanQualSetSlot(&epqstate, remoteslot);
1798 /* Do the actual update. */
1799 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1800 remoteslot);
1802 else
1805 * The tuple to be updated could not be found. Do nothing except for
1806 * emitting a log message.
1808 * XXX should this be promoted to ereport(LOG) perhaps?
1810 elog(DEBUG1,
1811 "logical replication did not find row to be updated "
1812 "in replication target relation \"%s\"",
1813 RelationGetRelationName(localrel));
1816 /* Cleanup. */
1817 ExecCloseIndices(relinfo);
1818 EvalPlanQualEnd(&epqstate);
1822 * Handle DELETE message.
1824 * TODO: FDW support
1826 static void
1827 apply_handle_delete(StringInfo s)
1829 LogicalRepRelMapEntry *rel;
1830 LogicalRepTupleData oldtup;
1831 LogicalRepRelId relid;
1832 ApplyExecutionData *edata;
1833 EState *estate;
1834 TupleTableSlot *remoteslot;
1835 MemoryContext oldctx;
1837 if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
1838 return;
1840 begin_replication_step();
1842 relid = logicalrep_read_delete(s, &oldtup);
1843 rel = logicalrep_rel_open(relid, RowExclusiveLock);
1844 if (!should_apply_changes_for_rel(rel))
1847 * The relation can't become interesting in the middle of the
1848 * transaction so it's safe to unlock it.
1850 logicalrep_rel_close(rel, RowExclusiveLock);
1851 end_replication_step();
1852 return;
1855 /* Set relation for error callback */
1856 apply_error_callback_arg.rel = rel;
1858 /* Check if we can do the delete. */
1859 check_relation_updatable(rel);
1861 /* Initialize the executor state. */
1862 edata = create_edata_for_relation(rel);
1863 estate = edata->estate;
1864 remoteslot = ExecInitExtraTupleSlot(estate,
1865 RelationGetDescr(rel->localrel),
1866 &TTSOpsVirtual);
1868 /* Build the search tuple. */
1869 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1870 slot_store_data(remoteslot, rel, &oldtup);
1871 MemoryContextSwitchTo(oldctx);
1873 /* For a partitioned table, apply delete to correct partition. */
1874 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1875 apply_handle_tuple_routing(edata,
1876 remoteslot, NULL, CMD_DELETE);
1877 else
1878 apply_handle_delete_internal(edata, edata->targetRelInfo,
1879 remoteslot);
1881 finish_edata(edata);
1883 /* Reset relation for error callback */
1884 apply_error_callback_arg.rel = NULL;
1886 logicalrep_rel_close(rel, NoLock);
1888 end_replication_step();
1892 * Workhorse for apply_handle_delete()
1893 * relinfo is for the relation we're actually deleting from
1894 * (could be a child partition of edata->targetRelInfo)
1896 static void
1897 apply_handle_delete_internal(ApplyExecutionData *edata,
1898 ResultRelInfo *relinfo,
1899 TupleTableSlot *remoteslot)
1901 EState *estate = edata->estate;
1902 Relation localrel = relinfo->ri_RelationDesc;
1903 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
1904 EPQState epqstate;
1905 TupleTableSlot *localslot;
1906 bool found;
1908 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1909 ExecOpenIndices(relinfo, false);
1911 found = FindReplTupleInLocalRel(estate, localrel, remoterel,
1912 remoteslot, &localslot);
1914 /* If found delete it. */
1915 if (found)
1917 EvalPlanQualSetSlot(&epqstate, localslot);
1919 /* Do the actual delete. */
1920 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
1922 else
1925 * The tuple to be deleted could not be found. Do nothing except for
1926 * emitting a log message.
1928 * XXX should this be promoted to ereport(LOG) perhaps?
1930 elog(DEBUG1,
1931 "logical replication did not find row to be deleted "
1932 "in replication target relation \"%s\"",
1933 RelationGetRelationName(localrel));
1936 /* Cleanup. */
1937 ExecCloseIndices(relinfo);
1938 EvalPlanQualEnd(&epqstate);
1942 * Try to find a tuple received from the publication side (in 'remoteslot') in
1943 * the corresponding local relation using either replica identity index,
1944 * primary key or if needed, sequential scan.
1946 * Local tuple, if found, is returned in '*localslot'.
1948 static bool
1949 FindReplTupleInLocalRel(EState *estate, Relation localrel,
1950 LogicalRepRelation *remoterel,
1951 TupleTableSlot *remoteslot,
1952 TupleTableSlot **localslot)
1954 Oid idxoid;
1955 bool found;
1957 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
1959 idxoid = GetRelationIdentityOrPK(localrel);
1960 Assert(OidIsValid(idxoid) ||
1961 (remoterel->replident == REPLICA_IDENTITY_FULL));
1963 if (OidIsValid(idxoid))
1964 found = RelationFindReplTupleByIndex(localrel, idxoid,
1965 LockTupleExclusive,
1966 remoteslot, *localslot);
1967 else
1968 found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1969 remoteslot, *localslot);
1971 return found;
1975 * This handles insert, update, delete on a partitioned table.
1977 static void
1978 apply_handle_tuple_routing(ApplyExecutionData *edata,
1979 TupleTableSlot *remoteslot,
1980 LogicalRepTupleData *newtup,
1981 CmdType operation)
1983 EState *estate = edata->estate;
1984 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1985 ResultRelInfo *relinfo = edata->targetRelInfo;
1986 Relation parentrel = relinfo->ri_RelationDesc;
1987 ModifyTableState *mtstate;
1988 PartitionTupleRouting *proute;
1989 ResultRelInfo *partrelinfo;
1990 Relation partrel;
1991 TupleTableSlot *remoteslot_part;
1992 TupleConversionMap *map;
1993 MemoryContext oldctx;
1995 /* ModifyTableState is needed for ExecFindPartition(). */
1996 edata->mtstate = mtstate = makeNode(ModifyTableState);
1997 mtstate->ps.plan = NULL;
1998 mtstate->ps.state = estate;
1999 mtstate->operation = operation;
2000 mtstate->resultRelInfo = relinfo;
2002 /* ... as is PartitionTupleRouting. */
2003 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2006 * Find the partition to which the "search tuple" belongs.
2008 Assert(remoteslot != NULL);
2009 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2010 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2011 remoteslot, estate);
2012 Assert(partrelinfo != NULL);
2013 partrel = partrelinfo->ri_RelationDesc;
2016 * To perform any of the operations below, the tuple must match the
2017 * partition's rowtype. Convert if needed or just copy, using a dedicated
2018 * slot to store the tuple in any case.
2020 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2021 if (remoteslot_part == NULL)
2022 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2023 map = partrelinfo->ri_RootToPartitionMap;
2024 if (map != NULL)
2025 remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
2026 remoteslot_part);
2027 else
2029 remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2030 slot_getallattrs(remoteslot_part);
2032 MemoryContextSwitchTo(oldctx);
2034 switch (operation)
2036 case CMD_INSERT:
2037 apply_handle_insert_internal(edata, partrelinfo,
2038 remoteslot_part);
2039 break;
2041 case CMD_DELETE:
2042 apply_handle_delete_internal(edata, partrelinfo,
2043 remoteslot_part);
2044 break;
2046 case CMD_UPDATE:
2049 * For UPDATE, depending on whether or not the updated tuple
2050 * satisfies the partition's constraint, perform a simple UPDATE
2051 * of the partition or move the updated tuple into a different
2052 * suitable partition.
2055 AttrMap *attrmap = map ? map->attrMap : NULL;
2056 LogicalRepRelMapEntry *part_entry;
2057 TupleTableSlot *localslot;
2058 ResultRelInfo *partrelinfo_new;
2059 bool found;
2061 part_entry = logicalrep_partition_open(relmapentry, partrel,
2062 attrmap);
2064 /* Get the matching local tuple from the partition. */
2065 found = FindReplTupleInLocalRel(estate, partrel,
2066 &part_entry->remoterel,
2067 remoteslot_part, &localslot);
2068 if (!found)
2071 * The tuple to be updated could not be found. Do nothing
2072 * except for emitting a log message.
2074 * XXX should this be promoted to ereport(LOG) perhaps?
2076 elog(DEBUG1,
2077 "logical replication did not find row to be updated "
2078 "in replication target relation's partition \"%s\"",
2079 RelationGetRelationName(partrel));
2080 return;
2084 * Apply the update to the local tuple, putting the result in
2085 * remoteslot_part.
2087 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2088 slot_modify_data(remoteslot_part, localslot, part_entry,
2089 newtup);
2090 MemoryContextSwitchTo(oldctx);
2093 * Does the updated tuple still satisfy the current
2094 * partition's constraint?
2096 if (!partrel->rd_rel->relispartition ||
2097 ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
2098 false))
2101 * Yes, so simply UPDATE the partition. We don't call
2102 * apply_handle_update_internal() here, which would
2103 * normally do the following work, to avoid repeating some
2104 * work already done above to find the local tuple in the
2105 * partition.
2107 EPQState epqstate;
2109 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2110 ExecOpenIndices(partrelinfo, false);
2112 EvalPlanQualSetSlot(&epqstate, remoteslot_part);
2113 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
2114 localslot, remoteslot_part);
2115 ExecCloseIndices(partrelinfo);
2116 EvalPlanQualEnd(&epqstate);
2118 else
2120 /* Move the tuple into the new partition. */
2123 * New partition will be found using tuple routing, which
2124 * can only occur via the parent table. We might need to
2125 * convert the tuple to the parent's rowtype. Note that
2126 * this is the tuple found in the partition, not the
2127 * original search tuple received by this function.
2129 if (map)
2131 TupleConversionMap *PartitionToRootMap =
2132 convert_tuples_by_name(RelationGetDescr(partrel),
2133 RelationGetDescr(parentrel));
2135 remoteslot =
2136 execute_attr_map_slot(PartitionToRootMap->attrMap,
2137 remoteslot_part, remoteslot);
2139 else
2141 remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
2142 slot_getallattrs(remoteslot);
2146 /* Find the new partition. */
2147 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2148 partrelinfo_new = ExecFindPartition(mtstate, relinfo,
2149 proute, remoteslot,
2150 estate);
2151 MemoryContextSwitchTo(oldctx);
2152 Assert(partrelinfo_new != partrelinfo);
2154 /* DELETE old tuple found in the old partition. */
2155 apply_handle_delete_internal(edata, partrelinfo,
2156 localslot);
2158 /* INSERT new tuple into the new partition. */
2161 * Convert the replacement tuple to match the destination
2162 * partition rowtype.
2164 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2165 partrel = partrelinfo_new->ri_RelationDesc;
2166 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
2167 if (remoteslot_part == NULL)
2168 remoteslot_part = table_slot_create(partrel,
2169 &estate->es_tupleTable);
2170 map = partrelinfo_new->ri_RootToPartitionMap;
2171 if (map != NULL)
2173 remoteslot_part = execute_attr_map_slot(map->attrMap,
2174 remoteslot,
2175 remoteslot_part);
2177 else
2179 remoteslot_part = ExecCopySlot(remoteslot_part,
2180 remoteslot);
2181 slot_getallattrs(remoteslot);
2183 MemoryContextSwitchTo(oldctx);
2184 apply_handle_insert_internal(edata, partrelinfo_new,
2185 remoteslot_part);
2188 break;
2190 default:
2191 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
2192 break;
2197 * Handle TRUNCATE message.
2199 * TODO: FDW support
2201 static void
2202 apply_handle_truncate(StringInfo s)
2204 bool cascade = false;
2205 bool restart_seqs = false;
2206 List *remote_relids = NIL;
2207 List *remote_rels = NIL;
2208 List *rels = NIL;
2209 List *part_rels = NIL;
2210 List *relids = NIL;
2211 List *relids_logged = NIL;
2212 ListCell *lc;
2213 LOCKMODE lockmode = AccessExclusiveLock;
2215 if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
2216 return;
2218 begin_replication_step();
2220 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
2222 foreach(lc, remote_relids)
2224 LogicalRepRelId relid = lfirst_oid(lc);
2225 LogicalRepRelMapEntry *rel;
2227 rel = logicalrep_rel_open(relid, lockmode);
2228 if (!should_apply_changes_for_rel(rel))
2231 * The relation can't become interesting in the middle of the
2232 * transaction so it's safe to unlock it.
2234 logicalrep_rel_close(rel, lockmode);
2235 continue;
2238 remote_rels = lappend(remote_rels, rel);
2239 rels = lappend(rels, rel->localrel);
2240 relids = lappend_oid(relids, rel->localreloid);
2241 if (RelationIsLogicallyLogged(rel->localrel))
2242 relids_logged = lappend_oid(relids_logged, rel->localreloid);
2245 * Truncate partitions if we got a message to truncate a partitioned
2246 * table.
2248 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2250 ListCell *child;
2251 List *children = find_all_inheritors(rel->localreloid,
2252 lockmode,
2253 NULL);
2255 foreach(child, children)
2257 Oid childrelid = lfirst_oid(child);
2258 Relation childrel;
2260 if (list_member_oid(relids, childrelid))
2261 continue;
2263 /* find_all_inheritors already got lock */
2264 childrel = table_open(childrelid, NoLock);
2267 * Ignore temp tables of other backends. See similar code in
2268 * ExecuteTruncate().
2270 if (RELATION_IS_OTHER_TEMP(childrel))
2272 table_close(childrel, lockmode);
2273 continue;
2276 rels = lappend(rels, childrel);
2277 part_rels = lappend(part_rels, childrel);
2278 relids = lappend_oid(relids, childrelid);
2279 /* Log this relation only if needed for logical decoding */
2280 if (RelationIsLogicallyLogged(childrel))
2281 relids_logged = lappend_oid(relids_logged, childrelid);
2287 * Even if we used CASCADE on the upstream primary we explicitly default
2288 * to replaying changes without further cascading. This might be later
2289 * changeable with a user specified option.
2291 ExecuteTruncateGuts(rels,
2292 relids,
2293 relids_logged,
2294 DROP_RESTRICT,
2295 restart_seqs);
2296 foreach(lc, remote_rels)
2298 LogicalRepRelMapEntry *rel = lfirst(lc);
2300 logicalrep_rel_close(rel, NoLock);
2302 foreach(lc, part_rels)
2304 Relation rel = lfirst(lc);
2306 table_close(rel, NoLock);
2309 end_replication_step();
2314 * Logical replication protocol message dispatcher.
2316 static void
2317 apply_dispatch(StringInfo s)
2319 LogicalRepMsgType action = pq_getmsgbyte(s);
2320 LogicalRepMsgType saved_command;
2323 * Set the current command being applied. Since this function can be
2324 * called recusively when applying spooled changes, save the current
2325 * command.
2327 saved_command = apply_error_callback_arg.command;
2328 apply_error_callback_arg.command = action;
2330 switch (action)
2332 case LOGICAL_REP_MSG_BEGIN:
2333 apply_handle_begin(s);
2334 break;
2336 case LOGICAL_REP_MSG_COMMIT:
2337 apply_handle_commit(s);
2338 break;
2340 case LOGICAL_REP_MSG_INSERT:
2341 apply_handle_insert(s);
2342 break;
2344 case LOGICAL_REP_MSG_UPDATE:
2345 apply_handle_update(s);
2346 break;
2348 case LOGICAL_REP_MSG_DELETE:
2349 apply_handle_delete(s);
2350 break;
2352 case LOGICAL_REP_MSG_TRUNCATE:
2353 apply_handle_truncate(s);
2354 break;
2356 case LOGICAL_REP_MSG_RELATION:
2357 apply_handle_relation(s);
2358 break;
2360 case LOGICAL_REP_MSG_TYPE:
2361 apply_handle_type(s);
2362 break;
2364 case LOGICAL_REP_MSG_ORIGIN:
2365 apply_handle_origin(s);
2366 break;
2368 case LOGICAL_REP_MSG_MESSAGE:
2371 * Logical replication does not use generic logical messages yet.
2372 * Although, it could be used by other applications that use this
2373 * output plugin.
2375 break;
2377 case LOGICAL_REP_MSG_STREAM_START:
2378 apply_handle_stream_start(s);
2379 break;
2381 case LOGICAL_REP_MSG_STREAM_STOP:
2382 apply_handle_stream_stop(s);
2383 break;
2385 case LOGICAL_REP_MSG_STREAM_ABORT:
2386 apply_handle_stream_abort(s);
2387 break;
2389 case LOGICAL_REP_MSG_STREAM_COMMIT:
2390 apply_handle_stream_commit(s);
2391 break;
2393 case LOGICAL_REP_MSG_BEGIN_PREPARE:
2394 apply_handle_begin_prepare(s);
2395 break;
2397 case LOGICAL_REP_MSG_PREPARE:
2398 apply_handle_prepare(s);
2399 break;
2401 case LOGICAL_REP_MSG_COMMIT_PREPARED:
2402 apply_handle_commit_prepared(s);
2403 break;
2405 case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
2406 apply_handle_rollback_prepared(s);
2407 break;
2409 case LOGICAL_REP_MSG_STREAM_PREPARE:
2410 apply_handle_stream_prepare(s);
2411 break;
2413 default:
2414 ereport(ERROR,
2415 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2416 errmsg("invalid logical replication message type \"%c\"", action)));
2419 /* Reset the current command */
2420 apply_error_callback_arg.command = saved_command;
2424 * Figure out which write/flush positions to report to the walsender process.
2426 * We can't simply report back the last LSN the walsender sent us because the
2427 * local transaction might not yet be flushed to disk locally. Instead we
2428 * build a list that associates local with remote LSNs for every commit. When
2429 * reporting back the flush position to the sender we iterate that list and
2430 * check which entries on it are already locally flushed. Those we can report
2431 * as having been flushed.
2433 * The have_pending_txes is true if there are outstanding transactions that
2434 * need to be flushed.
2436 static void
2437 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
2438 bool *have_pending_txes)
2440 dlist_mutable_iter iter;
2441 XLogRecPtr local_flush = GetFlushRecPtr(NULL);
2443 *write = InvalidXLogRecPtr;
2444 *flush = InvalidXLogRecPtr;
2446 dlist_foreach_modify(iter, &lsn_mapping)
2448 FlushPosition *pos =
2449 dlist_container(FlushPosition, node, iter.cur);
2451 *write = pos->remote_end;
2453 if (pos->local_end <= local_flush)
2455 *flush = pos->remote_end;
2456 dlist_delete(iter.cur);
2457 pfree(pos);
2459 else
2462 * Don't want to uselessly iterate over the rest of the list which
2463 * could potentially be long. Instead get the last element and
2464 * grab the write position from there.
2466 pos = dlist_tail_element(FlushPosition, node,
2467 &lsn_mapping);
2468 *write = pos->remote_end;
2469 *have_pending_txes = true;
2470 return;
2474 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2478 * Store current remote/local lsn pair in the tracking list.
2480 static void
2481 store_flush_position(XLogRecPtr remote_lsn)
2483 FlushPosition *flushpos;
2485 /* Need to do this in permanent context */
2486 MemoryContextSwitchTo(ApplyContext);
2488 /* Track commit lsn */
2489 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2490 flushpos->local_end = XactLastCommitEnd;
2491 flushpos->remote_end = remote_lsn;
2493 dlist_push_tail(&lsn_mapping, &flushpos->node);
2494 MemoryContextSwitchTo(ApplyMessageContext);
2498 /* Update statistics of the worker. */
2499 static void
2500 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2502 MyLogicalRepWorker->last_lsn = last_lsn;
2503 MyLogicalRepWorker->last_send_time = send_time;
2504 MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
2505 if (reply)
2507 MyLogicalRepWorker->reply_lsn = last_lsn;
2508 MyLogicalRepWorker->reply_time = send_time;
2513 * Apply main loop.
2515 static void
2516 LogicalRepApplyLoop(XLogRecPtr last_received)
2518 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2519 bool ping_sent = false;
2520 TimeLineID tli;
2521 ErrorContextCallback errcallback;
2524 * Init the ApplyMessageContext which we clean up after each replication
2525 * protocol message.
2527 ApplyMessageContext = AllocSetContextCreate(ApplyContext,
2528 "ApplyMessageContext",
2529 ALLOCSET_DEFAULT_SIZES);
2532 * This memory context is used for per-stream data when the streaming mode
2533 * is enabled. This context is reset on each stream stop.
2535 LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
2536 "LogicalStreamingContext",
2537 ALLOCSET_DEFAULT_SIZES);
2539 /* mark as idle, before starting to loop */
2540 pgstat_report_activity(STATE_IDLE, NULL);
2543 * Push apply error context callback. Fields will be filled during
2544 * applying a change.
2546 errcallback.callback = apply_error_callback;
2547 errcallback.previous = error_context_stack;
2548 error_context_stack = &errcallback;
2550 /* This outer loop iterates once per wait. */
2551 for (;;)
2553 pgsocket fd = PGINVALID_SOCKET;
2554 int rc;
2555 int len;
2556 char *buf = NULL;
2557 bool endofstream = false;
2558 long wait_time;
2560 CHECK_FOR_INTERRUPTS();
2562 MemoryContextSwitchTo(ApplyMessageContext);
2564 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
2566 if (len != 0)
2568 /* Loop to process all available data (without blocking). */
2569 for (;;)
2571 CHECK_FOR_INTERRUPTS();
2573 if (len == 0)
2575 break;
2577 else if (len < 0)
2579 ereport(LOG,
2580 (errmsg("data stream from publisher has ended")));
2581 endofstream = true;
2582 break;
2584 else
2586 int c;
2587 StringInfoData s;
2589 /* Reset timeout. */
2590 last_recv_timestamp = GetCurrentTimestamp();
2591 ping_sent = false;
2593 /* Ensure we are reading the data into our memory context. */
2594 MemoryContextSwitchTo(ApplyMessageContext);
2596 s.data = buf;
2597 s.len = len;
2598 s.cursor = 0;
2599 s.maxlen = -1;
2601 c = pq_getmsgbyte(&s);
2603 if (c == 'w')
2605 XLogRecPtr start_lsn;
2606 XLogRecPtr end_lsn;
2607 TimestampTz send_time;
2609 start_lsn = pq_getmsgint64(&s);
2610 end_lsn = pq_getmsgint64(&s);
2611 send_time = pq_getmsgint64(&s);
2613 if (last_received < start_lsn)
2614 last_received = start_lsn;
2616 if (last_received < end_lsn)
2617 last_received = end_lsn;
2619 UpdateWorkerStats(last_received, send_time, false);
2621 apply_dispatch(&s);
2623 else if (c == 'k')
2625 XLogRecPtr end_lsn;
2626 TimestampTz timestamp;
2627 bool reply_requested;
2629 end_lsn = pq_getmsgint64(&s);
2630 timestamp = pq_getmsgint64(&s);
2631 reply_requested = pq_getmsgbyte(&s);
2633 if (last_received < end_lsn)
2634 last_received = end_lsn;
2636 send_feedback(last_received, reply_requested, false);
2637 UpdateWorkerStats(last_received, timestamp, true);
2639 /* other message types are purposefully ignored */
2641 MemoryContextReset(ApplyMessageContext);
2644 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
2648 /* confirm all writes so far */
2649 send_feedback(last_received, false, false);
2651 if (!in_remote_transaction && !in_streamed_transaction)
2654 * If we didn't get any transactions for a while there might be
2655 * unconsumed invalidation messages in the queue, consume them
2656 * now.
2658 AcceptInvalidationMessages();
2659 maybe_reread_subscription();
2661 /* Process any table synchronization changes. */
2662 process_syncing_tables(last_received);
2665 /* Cleanup the memory. */
2666 MemoryContextResetAndDeleteChildren(ApplyMessageContext);
2667 MemoryContextSwitchTo(TopMemoryContext);
2669 /* Check if we need to exit the streaming loop. */
2670 if (endofstream)
2671 break;
2674 * Wait for more data or latch. If we have unflushed transactions,
2675 * wake up after WalWriterDelay to see if they've been flushed yet (in
2676 * which case we should send a feedback message). Otherwise, there's
2677 * no particular urgency about waking up unless we get data or a
2678 * signal.
2680 if (!dlist_is_empty(&lsn_mapping))
2681 wait_time = WalWriterDelay;
2682 else
2683 wait_time = NAPTIME_PER_CYCLE;
2685 rc = WaitLatchOrSocket(MyLatch,
2686 WL_SOCKET_READABLE | WL_LATCH_SET |
2687 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
2688 fd, wait_time,
2689 WAIT_EVENT_LOGICAL_APPLY_MAIN);
2691 if (rc & WL_LATCH_SET)
2693 ResetLatch(MyLatch);
2694 CHECK_FOR_INTERRUPTS();
2697 if (ConfigReloadPending)
2699 ConfigReloadPending = false;
2700 ProcessConfigFile(PGC_SIGHUP);
2703 if (rc & WL_TIMEOUT)
2706 * We didn't receive anything new. If we haven't heard anything
2707 * from the server for more than wal_receiver_timeout / 2, ping
2708 * the server. Also, if it's been longer than
2709 * wal_receiver_status_interval since the last update we sent,
2710 * send a status update to the primary anyway, to report any
2711 * progress in applying WAL.
2713 bool requestReply = false;
2716 * Check if time since last receive from primary has reached the
2717 * configured limit.
2719 if (wal_receiver_timeout > 0)
2721 TimestampTz now = GetCurrentTimestamp();
2722 TimestampTz timeout;
2724 timeout =
2725 TimestampTzPlusMilliseconds(last_recv_timestamp,
2726 wal_receiver_timeout);
2728 if (now >= timeout)
2729 ereport(ERROR,
2730 (errcode(ERRCODE_CONNECTION_FAILURE),
2731 errmsg("terminating logical replication worker due to timeout")));
2733 /* Check to see if it's time for a ping. */
2734 if (!ping_sent)
2736 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2737 (wal_receiver_timeout / 2));
2738 if (now >= timeout)
2740 requestReply = true;
2741 ping_sent = true;
2746 send_feedback(last_received, requestReply, requestReply);
2750 /* Pop the error context stack */
2751 error_context_stack = errcallback.previous;
2753 /* All done */
2754 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
2758 * Send a Standby Status Update message to server.
2760 * 'recvpos' is the latest LSN we've received data to, force is set if we need
2761 * to send a response to avoid timeouts.
2763 static void
2764 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
2766 static StringInfo reply_message = NULL;
2767 static TimestampTz send_time = 0;
2769 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2770 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2771 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2773 XLogRecPtr writepos;
2774 XLogRecPtr flushpos;
2775 TimestampTz now;
2776 bool have_pending_txes;
2779 * If the user doesn't want status to be reported to the publisher, be
2780 * sure to exit before doing anything at all.
2782 if (!force && wal_receiver_status_interval <= 0)
2783 return;
2785 /* It's legal to not pass a recvpos */
2786 if (recvpos < last_recvpos)
2787 recvpos = last_recvpos;
2789 get_flush_position(&writepos, &flushpos, &have_pending_txes);
2792 * No outstanding transactions to flush, we can report the latest received
2793 * position. This is important for synchronous replication.
2795 if (!have_pending_txes)
2796 flushpos = writepos = recvpos;
2798 if (writepos < last_writepos)
2799 writepos = last_writepos;
2801 if (flushpos < last_flushpos)
2802 flushpos = last_flushpos;
2804 now = GetCurrentTimestamp();
2806 /* if we've already reported everything we're good */
2807 if (!force &&
2808 writepos == last_writepos &&
2809 flushpos == last_flushpos &&
2810 !TimestampDifferenceExceeds(send_time, now,
2811 wal_receiver_status_interval * 1000))
2812 return;
2813 send_time = now;
2815 if (!reply_message)
2817 MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
2819 reply_message = makeStringInfo();
2820 MemoryContextSwitchTo(oldctx);
2822 else
2823 resetStringInfo(reply_message);
2825 pq_sendbyte(reply_message, 'r');
2826 pq_sendint64(reply_message, recvpos); /* write */
2827 pq_sendint64(reply_message, flushpos); /* flush */
2828 pq_sendint64(reply_message, writepos); /* apply */
2829 pq_sendint64(reply_message, now); /* sendTime */
2830 pq_sendbyte(reply_message, requestReply); /* replyRequested */
2832 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2833 force,
2834 LSN_FORMAT_ARGS(recvpos),
2835 LSN_FORMAT_ARGS(writepos),
2836 LSN_FORMAT_ARGS(flushpos));
2838 walrcv_send(LogRepWorkerWalRcvConn,
2839 reply_message->data, reply_message->len);
2841 if (recvpos > last_recvpos)
2842 last_recvpos = recvpos;
2843 if (writepos > last_writepos)
2844 last_writepos = writepos;
2845 if (flushpos > last_flushpos)
2846 last_flushpos = flushpos;
2850 * Reread subscription info if needed. Most changes will be exit.
2852 static void
2853 maybe_reread_subscription(void)
2855 MemoryContext oldctx;
2856 Subscription *newsub;
2857 bool started_tx = false;
2859 /* When cache state is valid there is nothing to do here. */
2860 if (MySubscriptionValid)
2861 return;
2863 /* This function might be called inside or outside of transaction. */
2864 if (!IsTransactionState())
2866 StartTransactionCommand();
2867 started_tx = true;
2870 /* Ensure allocations in permanent context. */
2871 oldctx = MemoryContextSwitchTo(ApplyContext);
2873 newsub = GetSubscription(MyLogicalRepWorker->subid, true);
2876 * Exit if the subscription was removed. This normally should not happen
2877 * as the worker gets killed during DROP SUBSCRIPTION.
2879 if (!newsub)
2881 ereport(LOG,
2882 (errmsg("logical replication apply worker for subscription \"%s\" will "
2883 "stop because the subscription was removed",
2884 MySubscription->name)));
2886 proc_exit(0);
2890 * Exit if the subscription was disabled. This normally should not happen
2891 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2893 if (!newsub->enabled)
2895 ereport(LOG,
2896 (errmsg("logical replication apply worker for subscription \"%s\" will "
2897 "stop because the subscription was disabled",
2898 MySubscription->name)));
2900 proc_exit(0);
2903 /* !slotname should never happen when enabled is true. */
2904 Assert(newsub->slotname);
2906 /* two-phase should not be altered */
2907 Assert(newsub->twophasestate == MySubscription->twophasestate);
2910 * Exit if any parameter that affects the remote connection was changed.
2911 * The launcher will start a new worker.
2913 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
2914 strcmp(newsub->name, MySubscription->name) != 0 ||
2915 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
2916 newsub->binary != MySubscription->binary ||
2917 newsub->stream != MySubscription->stream ||
2918 !equal(newsub->publications, MySubscription->publications))
2920 ereport(LOG,
2921 (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2922 MySubscription->name)));
2924 proc_exit(0);
2927 /* Check for other changes that should never happen too. */
2928 if (newsub->dbid != MySubscription->dbid)
2930 elog(ERROR, "subscription %u changed unexpectedly",
2931 MyLogicalRepWorker->subid);
2934 /* Clean old subscription info and switch to new one. */
2935 FreeSubscription(MySubscription);
2936 MySubscription = newsub;
2938 MemoryContextSwitchTo(oldctx);
2940 /* Change synchronous commit according to the user's wishes */
2941 SetConfigOption("synchronous_commit", MySubscription->synccommit,
2942 PGC_BACKEND, PGC_S_OVERRIDE);
2944 if (started_tx)
2945 CommitTransactionCommand();
2947 MySubscriptionValid = true;
2951 * Callback from subscription syscache invalidation.
2953 static void
2954 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
2956 MySubscriptionValid = false;
2960 * subxact_info_write
2961 * Store information about subxacts for a toplevel transaction.
2963 * For each subxact we store offset of it's first change in the main file.
2964 * The file is always over-written as a whole.
2966 * XXX We should only store subxacts that were not aborted yet.
2968 static void
2969 subxact_info_write(Oid subid, TransactionId xid)
2971 char path[MAXPGPATH];
2972 Size len;
2973 BufFile *fd;
2975 Assert(TransactionIdIsValid(xid));
2977 /* construct the subxact filename */
2978 subxact_filename(path, subid, xid);
2980 /* Delete the subxacts file, if exists. */
2981 if (subxact_data.nsubxacts == 0)
2983 cleanup_subxact_info();
2984 BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
2986 return;
2990 * Create the subxact file if it not already created, otherwise open the
2991 * existing file.
2993 fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
2994 true);
2995 if (fd == NULL)
2996 fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
2998 len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
3000 /* Write the subxact count and subxact info */
3001 BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
3002 BufFileWrite(fd, subxact_data.subxacts, len);
3004 BufFileClose(fd);
3006 /* free the memory allocated for subxact info */
3007 cleanup_subxact_info();
3011 * subxact_info_read
3012 * Restore information about subxacts of a streamed transaction.
3014 * Read information about subxacts into the structure subxact_data that can be
3015 * used later.
3017 static void
3018 subxact_info_read(Oid subid, TransactionId xid)
3020 char path[MAXPGPATH];
3021 Size len;
3022 BufFile *fd;
3023 MemoryContext oldctx;
3025 Assert(!subxact_data.subxacts);
3026 Assert(subxact_data.nsubxacts == 0);
3027 Assert(subxact_data.nsubxacts_max == 0);
3030 * If the subxact file doesn't exist that means we don't have any subxact
3031 * info.
3033 subxact_filename(path, subid, xid);
3034 fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
3035 true);
3036 if (fd == NULL)
3037 return;
3039 /* read number of subxact items */
3040 if (BufFileRead(fd, &subxact_data.nsubxacts,
3041 sizeof(subxact_data.nsubxacts)) !=
3042 sizeof(subxact_data.nsubxacts))
3043 ereport(ERROR,
3044 (errcode_for_file_access(),
3045 errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3046 path)));
3048 len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
3050 /* we keep the maximum as a power of 2 */
3051 subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
3054 * Allocate subxact information in the logical streaming context. We need
3055 * this information during the complete stream so that we can add the sub
3056 * transaction info to this. On stream stop we will flush this information
3057 * to the subxact file and reset the logical streaming context.
3059 oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
3060 subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
3061 sizeof(SubXactInfo));
3062 MemoryContextSwitchTo(oldctx);
3064 if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
3065 ereport(ERROR,
3066 (errcode_for_file_access(),
3067 errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3068 path)));
3070 BufFileClose(fd);
3074 * subxact_info_add
3075 * Add information about a subxact (offset in the main file).
3077 static void
3078 subxact_info_add(TransactionId xid)
3080 SubXactInfo *subxacts = subxact_data.subxacts;
3081 int64 i;
3083 /* We must have a valid top level stream xid and a stream fd. */
3084 Assert(TransactionIdIsValid(stream_xid));
3085 Assert(stream_fd != NULL);
3088 * If the XID matches the toplevel transaction, we don't want to add it.
3090 if (stream_xid == xid)
3091 return;
3094 * In most cases we're checking the same subxact as we've already seen in
3095 * the last call, so make sure to ignore it (this change comes later).
3097 if (subxact_data.subxact_last == xid)
3098 return;
3100 /* OK, remember we're processing this XID. */
3101 subxact_data.subxact_last = xid;
3104 * Check if the transaction is already present in the array of subxact. We
3105 * intentionally scan the array from the tail, because we're likely adding
3106 * a change for the most recent subtransactions.
3108 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
3109 * would allow us to use binary search here.
3111 for (i = subxact_data.nsubxacts; i > 0; i--)
3113 /* found, so we're done */
3114 if (subxacts[i - 1].xid == xid)
3115 return;
3118 /* This is a new subxact, so we need to add it to the array. */
3119 if (subxact_data.nsubxacts == 0)
3121 MemoryContext oldctx;
3123 subxact_data.nsubxacts_max = 128;
3126 * Allocate this memory for subxacts in per-stream context, see
3127 * subxact_info_read.
3129 oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
3130 subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
3131 MemoryContextSwitchTo(oldctx);
3133 else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
3135 subxact_data.nsubxacts_max *= 2;
3136 subxacts = repalloc(subxacts,
3137 subxact_data.nsubxacts_max * sizeof(SubXactInfo));
3140 subxacts[subxact_data.nsubxacts].xid = xid;
3143 * Get the current offset of the stream file and store it as offset of
3144 * this subxact.
3146 BufFileTell(stream_fd,
3147 &subxacts[subxact_data.nsubxacts].fileno,
3148 &subxacts[subxact_data.nsubxacts].offset);
3150 subxact_data.nsubxacts++;
3151 subxact_data.subxacts = subxacts;
3154 /* format filename for file containing the info about subxacts */
3155 static inline void
3156 subxact_filename(char *path, Oid subid, TransactionId xid)
3158 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
3161 /* format filename for file containing serialized changes */
3162 static inline void
3163 changes_filename(char *path, Oid subid, TransactionId xid)
3165 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
3169 * stream_cleanup_files
3170 * Cleanup files for a subscription / toplevel transaction.
3172 * Remove files with serialized changes and subxact info for a particular
3173 * toplevel transaction. Each subscription has a separate set of files
3174 * for any toplevel transaction.
3176 static void
3177 stream_cleanup_files(Oid subid, TransactionId xid)
3179 char path[MAXPGPATH];
3181 /* Delete the changes file. */
3182 changes_filename(path, subid, xid);
3183 BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
3185 /* Delete the subxact file, if it exists. */
3186 subxact_filename(path, subid, xid);
3187 BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
3191 * stream_open_file
3192 * Open a file that we'll use to serialize changes for a toplevel
3193 * transaction.
3195 * Open a file for streamed changes from a toplevel transaction identified
3196 * by stream_xid (global variable). If it's the first chunk of streamed
3197 * changes for this transaction, create the buffile, otherwise open the
3198 * previously created file.
3200 * This can only be called at the beginning of a "streaming" block, i.e.
3201 * between stream_start/stream_stop messages from the upstream.
3203 static void
3204 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
3206 char path[MAXPGPATH];
3207 MemoryContext oldcxt;
3209 Assert(in_streamed_transaction);
3210 Assert(OidIsValid(subid));
3211 Assert(TransactionIdIsValid(xid));
3212 Assert(stream_fd == NULL);
3215 changes_filename(path, subid, xid);
3216 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
3219 * Create/open the buffiles under the logical streaming context so that we
3220 * have those files until stream stop.
3222 oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
3225 * If this is the first streamed segment, create the changes file.
3226 * Otherwise, just open the file for writing, in append mode.
3228 if (first_segment)
3229 stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
3230 path);
3231 else
3234 * Open the file and seek to the end of the file because we always
3235 * append the changes file.
3237 stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
3238 path, O_RDWR, false);
3239 BufFileSeek(stream_fd, 0, 0, SEEK_END);
3242 MemoryContextSwitchTo(oldcxt);
3246 * stream_close_file
3247 * Close the currently open file with streamed changes.
3249 * This can only be called at the end of a streaming block, i.e. at stream_stop
3250 * message from the upstream.
3252 static void
3253 stream_close_file(void)
3255 Assert(in_streamed_transaction);
3256 Assert(TransactionIdIsValid(stream_xid));
3257 Assert(stream_fd != NULL);
3259 BufFileClose(stream_fd);
3261 stream_xid = InvalidTransactionId;
3262 stream_fd = NULL;
3266 * stream_write_change
3267 * Serialize a change to a file for the current toplevel transaction.
3269 * The change is serialized in a simple format, with length (not including
3270 * the length), action code (identifying the message type) and message
3271 * contents (without the subxact TransactionId value).
3273 static void
3274 stream_write_change(char action, StringInfo s)
3276 int len;
3278 Assert(in_streamed_transaction);
3279 Assert(TransactionIdIsValid(stream_xid));
3280 Assert(stream_fd != NULL);
3282 /* total on-disk size, including the action type character */
3283 len = (s->len - s->cursor) + sizeof(char);
3285 /* first write the size */
3286 BufFileWrite(stream_fd, &len, sizeof(len));
3288 /* then the action */
3289 BufFileWrite(stream_fd, &action, sizeof(action));
3291 /* and finally the remaining part of the buffer (after the XID) */
3292 len = (s->len - s->cursor);
3294 BufFileWrite(stream_fd, &s->data[s->cursor], len);
3298 * Cleanup the memory for subxacts and reset the related variables.
3300 static inline void
3301 cleanup_subxact_info()
3303 if (subxact_data.subxacts)
3304 pfree(subxact_data.subxacts);
3306 subxact_data.subxacts = NULL;
3307 subxact_data.subxact_last = InvalidTransactionId;
3308 subxact_data.nsubxacts = 0;
3309 subxact_data.nsubxacts_max = 0;
3313 * Form the prepared transaction GID for two_phase transactions.
3315 * Return the GID in the supplied buffer.
3317 static void
3318 TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
3320 Assert(subid != InvalidRepOriginId);
3322 if (!TransactionIdIsValid(xid))
3323 ereport(ERROR,
3324 (errcode(ERRCODE_PROTOCOL_VIOLATION),
3325 errmsg_internal("invalid two-phase transaction ID")));
3327 snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
3330 /* Logical Replication Apply worker entry point */
3331 void
3332 ApplyWorkerMain(Datum main_arg)
3334 int worker_slot = DatumGetInt32(main_arg);
3335 MemoryContext cctx = CurrentMemoryContext;
3336 MemoryContext oldctx;
3337 char originname[NAMEDATALEN];
3338 XLogRecPtr origin_startpos;
3339 char *myslotname;
3340 WalRcvStreamOptions options;
3341 int server_version;
3343 /* Attach to slot */
3344 logicalrep_worker_attach(worker_slot);
3346 /* Setup signal handling */
3347 pqsignal(SIGHUP, SignalHandlerForConfigReload);
3348 pqsignal(SIGTERM, die);
3349 BackgroundWorkerUnblockSignals();
3352 * We don't currently need any ResourceOwner in a walreceiver process, but
3353 * if we did, we could call CreateAuxProcessResourceOwner here.
3356 /* Initialise stats to a sanish value */
3357 MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
3358 MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
3360 /* Load the libpq-specific functions */
3361 load_file("libpqwalreceiver", false);
3363 /* Run as replica session replication role. */
3364 SetConfigOption("session_replication_role", "replica",
3365 PGC_SUSET, PGC_S_OVERRIDE);
3367 /* Connect to our database. */
3368 BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
3369 MyLogicalRepWorker->userid,
3373 * Set always-secure search path, so malicious users can't redirect user
3374 * code (e.g. pg_index.indexprs).
3376 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3378 /* Load the subscription into persistent memory context. */
3379 ApplyContext = AllocSetContextCreate(TopMemoryContext,
3380 "ApplyContext",
3381 ALLOCSET_DEFAULT_SIZES);
3382 StartTransactionCommand();
3383 oldctx = MemoryContextSwitchTo(ApplyContext);
3385 MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
3386 if (!MySubscription)
3388 ereport(LOG,
3389 (errmsg("logical replication apply worker for subscription %u will not "
3390 "start because the subscription was removed during startup",
3391 MyLogicalRepWorker->subid)));
3392 proc_exit(0);
3395 MySubscriptionValid = true;
3396 MemoryContextSwitchTo(oldctx);
3398 if (!MySubscription->enabled)
3400 ereport(LOG,
3401 (errmsg("logical replication apply worker for subscription \"%s\" will not "
3402 "start because the subscription was disabled during startup",
3403 MySubscription->name)));
3405 proc_exit(0);
3408 /* Setup synchronous commit according to the user's wishes */
3409 SetConfigOption("synchronous_commit", MySubscription->synccommit,
3410 PGC_BACKEND, PGC_S_OVERRIDE);
3412 /* Keep us informed about subscription changes. */
3413 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
3414 subscription_change_cb,
3415 (Datum) 0);
3417 if (am_tablesync_worker())
3418 ereport(LOG,
3419 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3420 MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
3421 else
3422 ereport(LOG,
3423 (errmsg("logical replication apply worker for subscription \"%s\" has started",
3424 MySubscription->name)));
3426 CommitTransactionCommand();
3428 /* Connect to the origin and start the replication. */
3429 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3430 MySubscription->conninfo);
3432 if (am_tablesync_worker())
3434 char *syncslotname;
3436 PG_TRY();
3438 /* This is table synchronization worker, call initial sync. */
3439 syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3441 PG_CATCH();
3443 MemoryContext ecxt = MemoryContextSwitchTo(cctx);
3444 ErrorData *errdata = CopyErrorData();
3447 * Report the table sync error. There is no corresponding message
3448 * type for table synchronization.
3450 pgstat_report_subworker_error(MyLogicalRepWorker->subid,
3451 MyLogicalRepWorker->relid,
3452 MyLogicalRepWorker->relid,
3453 0, /* message type */
3454 InvalidTransactionId,
3455 errdata->message);
3456 MemoryContextSwitchTo(ecxt);
3457 PG_RE_THROW();
3459 PG_END_TRY();
3461 /* allocate slot name in long-lived context */
3462 myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3464 pfree(syncslotname);
3466 else
3468 /* This is main apply worker */
3469 RepOriginId originid;
3470 TimeLineID startpointTLI;
3471 char *err;
3473 myslotname = MySubscription->slotname;
3476 * This shouldn't happen if the subscription is enabled, but guard
3477 * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3478 * crash if slot is NULL.)
3480 if (!myslotname)
3481 ereport(ERROR,
3482 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3483 errmsg("subscription has no replication slot set")));
3485 /* Setup replication origin tracking. */
3486 StartTransactionCommand();
3487 snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3488 originid = replorigin_by_name(originname, true);
3489 if (!OidIsValid(originid))
3490 originid = replorigin_create(originname);
3491 replorigin_session_setup(originid);
3492 replorigin_session_origin = originid;
3493 origin_startpos = replorigin_session_get_progress(false);
3494 CommitTransactionCommand();
3496 LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
3497 MySubscription->name, &err);
3498 if (LogRepWorkerWalRcvConn == NULL)
3499 ereport(ERROR,
3500 (errcode(ERRCODE_CONNECTION_FAILURE),
3501 errmsg("could not connect to the publisher: %s", err)));
3504 * We don't really use the output identify_system for anything but it
3505 * does some initializations on the upstream so let's still call it.
3507 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3511 * Setup callback for syscache so that we know when something changes in
3512 * the subscription relation state.
3514 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
3515 invalidate_syncing_table_states,
3516 (Datum) 0);
3518 /* Build logical replication streaming options. */
3519 options.logical = true;
3520 options.startpoint = origin_startpos;
3521 options.slotname = myslotname;
3523 server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
3524 options.proto.logical.proto_version =
3525 server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
3526 server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
3527 LOGICALREP_PROTO_VERSION_NUM;
3529 options.proto.logical.publication_names = MySubscription->publications;
3530 options.proto.logical.binary = MySubscription->binary;
3531 options.proto.logical.streaming = MySubscription->stream;
3532 options.proto.logical.twophase = false;
3534 if (!am_tablesync_worker())
3537 * Even when the two_phase mode is requested by the user, it remains
3538 * as the tri-state PENDING until all tablesyncs have reached READY
3539 * state. Only then, can it become ENABLED.
3541 * Note: If the subscription has no tables then leave the state as
3542 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
3543 * work.
3545 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
3546 AllTablesyncsReady())
3548 /* Start streaming with two_phase enabled */
3549 options.proto.logical.twophase = true;
3550 walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3552 StartTransactionCommand();
3553 UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
3554 MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
3555 CommitTransactionCommand();
3557 else
3559 walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3562 ereport(DEBUG1,
3563 (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
3564 MySubscription->name,
3565 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
3566 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
3567 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
3568 "?")));
3570 else
3572 /* Start normal logical streaming replication. */
3573 walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3576 /* Run the main loop. */
3577 PG_TRY();
3579 LogicalRepApplyLoop(origin_startpos);
3581 PG_CATCH();
3583 /* report the apply error */
3584 if (apply_error_callback_arg.command != 0)
3586 MemoryContext ecxt = MemoryContextSwitchTo(cctx);
3587 ErrorData *errdata = CopyErrorData();
3589 pgstat_report_subworker_error(MyLogicalRepWorker->subid,
3590 MyLogicalRepWorker->relid,
3591 apply_error_callback_arg.rel != NULL
3592 ? apply_error_callback_arg.rel->localreloid
3593 : InvalidOid,
3594 apply_error_callback_arg.command,
3595 apply_error_callback_arg.remote_xid,
3596 errdata->message);
3597 MemoryContextSwitchTo(ecxt);
3600 PG_RE_THROW();
3602 PG_END_TRY();
3604 proc_exit(0);
3608 * Is current process a logical replication worker?
3610 bool
3611 IsLogicalWorker(void)
3613 return MyLogicalRepWorker != NULL;
3616 /* Error callback to give more context info about the change being applied */
3617 static void
3618 apply_error_callback(void *arg)
3620 StringInfoData buf;
3621 ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
3623 if (apply_error_callback_arg.command == 0)
3624 return;
3626 initStringInfo(&buf);
3627 appendStringInfo(&buf, _("processing remote data during \"%s\""),
3628 logicalrep_message_type(errarg->command));
3630 /* append relation information */
3631 if (errarg->rel)
3633 appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
3634 errarg->rel->remoterel.nspname,
3635 errarg->rel->remoterel.relname);
3636 if (errarg->remote_attnum >= 0)
3637 appendStringInfo(&buf, _(" column \"%s\""),
3638 errarg->rel->remoterel.attnames[errarg->remote_attnum]);
3641 /* append transaction information */
3642 if (TransactionIdIsNormal(errarg->remote_xid))
3644 appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
3645 if (errarg->ts != 0)
3646 appendStringInfo(&buf, _(" at %s"),
3647 timestamptz_to_str(errarg->ts));
3650 errcontext("%s", buf.data);
3651 pfree(buf.data);
3654 /* Set transaction information of apply error callback */
3655 static inline void
3656 set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
3658 apply_error_callback_arg.remote_xid = xid;
3659 apply_error_callback_arg.ts = ts;
3662 /* Reset all information of apply error callback */
3663 static inline void
3664 reset_apply_error_context_info(void)
3666 apply_error_callback_arg.command = 0;
3667 apply_error_callback_arg.rel = NULL;
3668 apply_error_callback_arg.remote_attnum = -1;
3669 set_apply_error_context_xact(InvalidTransactionId, 0);