1 /*-------------------------------------------------------------------------
4 * Logical Replication output plugin
6 * Copyright (c) 2012-2022, PostgreSQL Global Development Group
9 * src/backend/replication/pgoutput/pgoutput.c
11 *-------------------------------------------------------------------------
15 #include "access/tupconvert.h"
16 #include "catalog/partition.h"
17 #include "catalog/pg_publication.h"
18 #include "commands/defrem.h"
20 #include "replication/logical.h"
21 #include "replication/logicalproto.h"
22 #include "replication/origin.h"
23 #include "replication/pgoutput.h"
24 #include "utils/int8.h"
25 #include "utils/inval.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/syscache.h"
29 #include "utils/varlena.h"
33 extern void _PG_output_plugin_init(OutputPluginCallbacks
*cb
);
35 static void pgoutput_startup(LogicalDecodingContext
*ctx
,
36 OutputPluginOptions
*opt
, bool is_init
);
37 static void pgoutput_shutdown(LogicalDecodingContext
*ctx
);
38 static void pgoutput_begin_txn(LogicalDecodingContext
*ctx
,
39 ReorderBufferTXN
*txn
);
40 static void pgoutput_commit_txn(LogicalDecodingContext
*ctx
,
41 ReorderBufferTXN
*txn
, XLogRecPtr commit_lsn
);
42 static void pgoutput_change(LogicalDecodingContext
*ctx
,
43 ReorderBufferTXN
*txn
, Relation rel
,
44 ReorderBufferChange
*change
);
45 static void pgoutput_truncate(LogicalDecodingContext
*ctx
,
46 ReorderBufferTXN
*txn
, int nrelations
, Relation relations
[],
47 ReorderBufferChange
*change
);
48 static void pgoutput_message(LogicalDecodingContext
*ctx
,
49 ReorderBufferTXN
*txn
, XLogRecPtr message_lsn
,
50 bool transactional
, const char *prefix
,
51 Size sz
, const char *message
);
52 static bool pgoutput_origin_filter(LogicalDecodingContext
*ctx
,
53 RepOriginId origin_id
);
54 static void pgoutput_begin_prepare_txn(LogicalDecodingContext
*ctx
,
55 ReorderBufferTXN
*txn
);
56 static void pgoutput_prepare_txn(LogicalDecodingContext
*ctx
,
57 ReorderBufferTXN
*txn
, XLogRecPtr prepare_lsn
);
58 static void pgoutput_commit_prepared_txn(LogicalDecodingContext
*ctx
,
59 ReorderBufferTXN
*txn
, XLogRecPtr commit_lsn
);
60 static void pgoutput_rollback_prepared_txn(LogicalDecodingContext
*ctx
,
61 ReorderBufferTXN
*txn
,
62 XLogRecPtr prepare_end_lsn
,
63 TimestampTz prepare_time
);
64 static void pgoutput_stream_start(struct LogicalDecodingContext
*ctx
,
65 ReorderBufferTXN
*txn
);
66 static void pgoutput_stream_stop(struct LogicalDecodingContext
*ctx
,
67 ReorderBufferTXN
*txn
);
68 static void pgoutput_stream_abort(struct LogicalDecodingContext
*ctx
,
69 ReorderBufferTXN
*txn
,
70 XLogRecPtr abort_lsn
);
71 static void pgoutput_stream_commit(struct LogicalDecodingContext
*ctx
,
72 ReorderBufferTXN
*txn
,
73 XLogRecPtr commit_lsn
);
74 static void pgoutput_stream_prepare_txn(LogicalDecodingContext
*ctx
,
75 ReorderBufferTXN
*txn
, XLogRecPtr prepare_lsn
);
77 static bool publications_valid
;
78 static bool in_streaming
;
80 static List
*LoadPublications(List
*pubnames
);
81 static void publication_invalidation_cb(Datum arg
, int cacheid
,
83 static void send_relation_and_attrs(Relation relation
, TransactionId xid
,
84 LogicalDecodingContext
*ctx
);
85 static void send_repl_origin(LogicalDecodingContext
*ctx
,
86 RepOriginId origin_id
, XLogRecPtr origin_lsn
,
90 * Entry in the map used to remember which relation schemas we sent.
92 * The schema_sent flag determines if the current schema record for the
93 * relation (and for its ancestor if publish_as_relid is set) was already
94 * sent to the subscriber (in which case we don't need to send it again).
96 * The schema cache on downstream is however updated only at commit time,
97 * and with streamed transactions the commit order may be different from
98 * the order the transactions are sent in. Also, the (sub) transactions
99 * might get aborted so we need to send the schema for each (sub) transaction
100 * so that we don't lose the schema information on abort. For handling this,
101 * we maintain the list of xids (streamed_txns) for those we have already sent
104 * For partitions, 'pubactions' considers not only the table's own
105 * publications, but also those of all of its ancestors.
107 typedef struct RelationSyncEntry
109 Oid relid
; /* relation oid */
112 List
*streamed_txns
; /* streamed toplevel transactions with this
115 bool replicate_valid
;
116 PublicationActions pubactions
;
119 * OID of the relation to publish changes as. For a partition, this may
120 * be set to one of its ancestors whose schema will be used when
121 * replicating changes, if publish_via_partition_root is set for the
124 Oid publish_as_relid
;
127 * Map used when replicating using an ancestor's schema to convert tuples
128 * from partition's type to the ancestor's; NULL if publish_as_relid is
129 * same as 'relid' or if unnecessary due to partition and the ancestor
130 * having identical TupleDesc.
132 TupleConversionMap
*map
;
135 /* Map used to remember which relation schemas we sent. */
136 static HTAB
*RelationSyncCache
= NULL
;
138 static void init_rel_sync_cache(MemoryContext decoding_context
);
139 static void cleanup_rel_sync_cache(TransactionId xid
, bool is_commit
);
140 static RelationSyncEntry
*get_rel_sync_entry(PGOutputData
*data
, Oid relid
);
141 static void rel_sync_cache_relation_cb(Datum arg
, Oid relid
);
142 static void rel_sync_cache_publication_cb(Datum arg
, int cacheid
,
144 static void set_schema_sent_in_streamed_txn(RelationSyncEntry
*entry
,
146 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry
*entry
,
150 * Specify output plugin callbacks
153 _PG_output_plugin_init(OutputPluginCallbacks
*cb
)
155 AssertVariableIsOfType(&_PG_output_plugin_init
, LogicalOutputPluginInit
);
157 cb
->startup_cb
= pgoutput_startup
;
158 cb
->begin_cb
= pgoutput_begin_txn
;
159 cb
->change_cb
= pgoutput_change
;
160 cb
->truncate_cb
= pgoutput_truncate
;
161 cb
->message_cb
= pgoutput_message
;
162 cb
->commit_cb
= pgoutput_commit_txn
;
164 cb
->begin_prepare_cb
= pgoutput_begin_prepare_txn
;
165 cb
->prepare_cb
= pgoutput_prepare_txn
;
166 cb
->commit_prepared_cb
= pgoutput_commit_prepared_txn
;
167 cb
->rollback_prepared_cb
= pgoutput_rollback_prepared_txn
;
168 cb
->filter_by_origin_cb
= pgoutput_origin_filter
;
169 cb
->shutdown_cb
= pgoutput_shutdown
;
171 /* transaction streaming */
172 cb
->stream_start_cb
= pgoutput_stream_start
;
173 cb
->stream_stop_cb
= pgoutput_stream_stop
;
174 cb
->stream_abort_cb
= pgoutput_stream_abort
;
175 cb
->stream_commit_cb
= pgoutput_stream_commit
;
176 cb
->stream_change_cb
= pgoutput_change
;
177 cb
->stream_message_cb
= pgoutput_message
;
178 cb
->stream_truncate_cb
= pgoutput_truncate
;
179 /* transaction streaming - two-phase commit */
180 cb
->stream_prepare_cb
= pgoutput_stream_prepare_txn
;
184 parse_output_parameters(List
*options
, PGOutputData
*data
)
187 bool protocol_version_given
= false;
188 bool publication_names_given
= false;
189 bool binary_option_given
= false;
190 bool messages_option_given
= false;
191 bool streaming_given
= false;
192 bool two_phase_option_given
= false;
194 data
->binary
= false;
195 data
->streaming
= false;
196 data
->messages
= false;
197 data
->two_phase
= false;
201 DefElem
*defel
= (DefElem
*) lfirst(lc
);
203 Assert(defel
->arg
== NULL
|| IsA(defel
->arg
, String
));
205 /* Check each param, whether or not we recognize it */
206 if (strcmp(defel
->defname
, "proto_version") == 0)
210 if (protocol_version_given
)
212 (errcode(ERRCODE_SYNTAX_ERROR
),
213 errmsg("conflicting or redundant options")));
214 protocol_version_given
= true;
216 if (!scanint8(strVal(defel
->arg
), true, &parsed
))
218 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
219 errmsg("invalid proto_version")));
221 if (parsed
> PG_UINT32_MAX
|| parsed
< 0)
223 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
224 errmsg("proto_version \"%s\" out of range",
225 strVal(defel
->arg
))));
227 data
->protocol_version
= (uint32
) parsed
;
229 else if (strcmp(defel
->defname
, "publication_names") == 0)
231 if (publication_names_given
)
233 (errcode(ERRCODE_SYNTAX_ERROR
),
234 errmsg("conflicting or redundant options")));
235 publication_names_given
= true;
237 if (!SplitIdentifierString(strVal(defel
->arg
), ',',
238 &data
->publication_names
))
240 (errcode(ERRCODE_INVALID_NAME
),
241 errmsg("invalid publication_names syntax")));
243 else if (strcmp(defel
->defname
, "binary") == 0)
245 if (binary_option_given
)
247 (errcode(ERRCODE_SYNTAX_ERROR
),
248 errmsg("conflicting or redundant options")));
249 binary_option_given
= true;
251 data
->binary
= defGetBoolean(defel
);
253 else if (strcmp(defel
->defname
, "messages") == 0)
255 if (messages_option_given
)
257 (errcode(ERRCODE_SYNTAX_ERROR
),
258 errmsg("conflicting or redundant options")));
259 messages_option_given
= true;
261 data
->messages
= defGetBoolean(defel
);
263 else if (strcmp(defel
->defname
, "streaming") == 0)
267 (errcode(ERRCODE_SYNTAX_ERROR
),
268 errmsg("conflicting or redundant options")));
269 streaming_given
= true;
271 data
->streaming
= defGetBoolean(defel
);
273 else if (strcmp(defel
->defname
, "two_phase") == 0)
275 if (two_phase_option_given
)
277 (errcode(ERRCODE_SYNTAX_ERROR
),
278 errmsg("conflicting or redundant options")));
279 two_phase_option_given
= true;
281 data
->two_phase
= defGetBoolean(defel
);
284 elog(ERROR
, "unrecognized pgoutput option: %s", defel
->defname
);
289 * Initialize this plugin
292 pgoutput_startup(LogicalDecodingContext
*ctx
, OutputPluginOptions
*opt
,
295 PGOutputData
*data
= palloc0(sizeof(PGOutputData
));
297 /* Create our memory context for private allocations. */
298 data
->context
= AllocSetContextCreate(ctx
->context
,
299 "logical replication output context",
300 ALLOCSET_DEFAULT_SIZES
);
302 ctx
->output_plugin_private
= data
;
304 /* This plugin uses binary protocol. */
305 opt
->output_type
= OUTPUT_PLUGIN_BINARY_OUTPUT
;
308 * This is replication start and not slot initialization.
310 * Parse and validate options passed by the client.
314 /* Parse the params and ERROR if we see any we don't recognize */
315 parse_output_parameters(ctx
->output_plugin_options
, data
);
317 /* Check if we support requested protocol */
318 if (data
->protocol_version
> LOGICALREP_PROTO_MAX_VERSION_NUM
)
320 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
321 errmsg("client sent proto_version=%d but we only support protocol %d or lower",
322 data
->protocol_version
, LOGICALREP_PROTO_MAX_VERSION_NUM
)));
324 if (data
->protocol_version
< LOGICALREP_PROTO_MIN_VERSION_NUM
)
326 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
327 errmsg("client sent proto_version=%d but we only support protocol %d or higher",
328 data
->protocol_version
, LOGICALREP_PROTO_MIN_VERSION_NUM
)));
330 if (list_length(data
->publication_names
) < 1)
332 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
333 errmsg("publication_names parameter missing")));
336 * Decide whether to enable streaming. It is disabled by default, in
337 * which case we just update the flag in decoding context. Otherwise
338 * we only allow it with sufficient version of the protocol, and when
339 * the output plugin supports it.
341 if (!data
->streaming
)
342 ctx
->streaming
= false;
343 else if (data
->protocol_version
< LOGICALREP_PROTO_STREAM_VERSION_NUM
)
345 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
346 errmsg("requested proto_version=%d does not support streaming, need %d or higher",
347 data
->protocol_version
, LOGICALREP_PROTO_STREAM_VERSION_NUM
)));
348 else if (!ctx
->streaming
)
350 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
351 errmsg("streaming requested, but not supported by output plugin")));
353 /* Also remember we're currently not streaming any transaction. */
354 in_streaming
= false;
357 * Here, we just check whether the two-phase option is passed by
358 * plugin and decide whether to enable it at later point of time. It
359 * remains enabled if the previous start-up has done so. But we only
360 * allow the option to be passed in with sufficient version of the
361 * protocol, and when the output plugin supports it.
363 if (!data
->two_phase
)
364 ctx
->twophase_opt_given
= false;
365 else if (data
->protocol_version
< LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
)
367 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
368 errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
369 data
->protocol_version
, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
)));
370 else if (!ctx
->twophase
)
372 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
373 errmsg("two-phase commit requested, but not supported by output plugin")));
375 ctx
->twophase_opt_given
= true;
377 /* Init publication state. */
378 data
->publications
= NIL
;
379 publications_valid
= false;
380 CacheRegisterSyscacheCallback(PUBLICATIONOID
,
381 publication_invalidation_cb
,
384 /* Initialize relation schema cache. */
385 init_rel_sync_cache(CacheMemoryContext
);
390 * Disable the streaming and prepared transactions during the slot
391 * initialization mode.
393 ctx
->streaming
= false;
394 ctx
->twophase
= false;
402 pgoutput_begin_txn(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
)
404 bool send_replication_origin
= txn
->origin_id
!= InvalidRepOriginId
;
406 OutputPluginPrepareWrite(ctx
, !send_replication_origin
);
407 logicalrep_write_begin(ctx
->out
, txn
);
409 send_repl_origin(ctx
, txn
->origin_id
, txn
->origin_lsn
,
410 send_replication_origin
);
412 OutputPluginWrite(ctx
, true);
419 pgoutput_commit_txn(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
420 XLogRecPtr commit_lsn
)
422 OutputPluginUpdateProgress(ctx
);
424 OutputPluginPrepareWrite(ctx
, true);
425 logicalrep_write_commit(ctx
->out
, txn
, commit_lsn
);
426 OutputPluginWrite(ctx
, true);
430 * BEGIN PREPARE callback
433 pgoutput_begin_prepare_txn(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
)
435 bool send_replication_origin
= txn
->origin_id
!= InvalidRepOriginId
;
437 OutputPluginPrepareWrite(ctx
, !send_replication_origin
);
438 logicalrep_write_begin_prepare(ctx
->out
, txn
);
440 send_repl_origin(ctx
, txn
->origin_id
, txn
->origin_lsn
,
441 send_replication_origin
);
443 OutputPluginWrite(ctx
, true);
450 pgoutput_prepare_txn(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
451 XLogRecPtr prepare_lsn
)
453 OutputPluginUpdateProgress(ctx
);
455 OutputPluginPrepareWrite(ctx
, true);
456 logicalrep_write_prepare(ctx
->out
, txn
, prepare_lsn
);
457 OutputPluginWrite(ctx
, true);
461 * COMMIT PREPARED callback
464 pgoutput_commit_prepared_txn(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
465 XLogRecPtr commit_lsn
)
467 OutputPluginUpdateProgress(ctx
);
469 OutputPluginPrepareWrite(ctx
, true);
470 logicalrep_write_commit_prepared(ctx
->out
, txn
, commit_lsn
);
471 OutputPluginWrite(ctx
, true);
475 * ROLLBACK PREPARED callback
478 pgoutput_rollback_prepared_txn(LogicalDecodingContext
*ctx
,
479 ReorderBufferTXN
*txn
,
480 XLogRecPtr prepare_end_lsn
,
481 TimestampTz prepare_time
)
483 OutputPluginUpdateProgress(ctx
);
485 OutputPluginPrepareWrite(ctx
, true);
486 logicalrep_write_rollback_prepared(ctx
->out
, txn
, prepare_end_lsn
,
488 OutputPluginWrite(ctx
, true);
492 * Write the current schema of the relation and its ancestor (if any) if not
496 maybe_send_schema(LogicalDecodingContext
*ctx
,
497 ReorderBufferChange
*change
,
498 Relation relation
, RelationSyncEntry
*relentry
)
501 TransactionId xid
= InvalidTransactionId
;
502 TransactionId topxid
= InvalidTransactionId
;
505 * Remember XID of the (sub)transaction for the change. We don't care if
506 * it's top-level transaction or not (we have already sent that XID in
507 * start of the current streaming block).
509 * If we're not in a streaming block, just use InvalidTransactionId and
510 * the write methods will not include it.
513 xid
= change
->txn
->xid
;
515 if (change
->txn
->toptxn
)
516 topxid
= change
->txn
->toptxn
->xid
;
521 * Do we need to send the schema? We do track streamed transactions
522 * separately, because those may be applied later (and the regular
523 * transactions won't see their effects until then) and in an order that
524 * we don't know at this point.
526 * XXX There is a scope of optimization here. Currently, we always send
527 * the schema first time in a streaming transaction but we can probably
528 * avoid that by checking 'relentry->schema_sent' flag. However, before
529 * doing that we need to study its impact on the case where we have a mix
530 * of streaming and non-streaming transactions.
533 schema_sent
= get_schema_sent_in_streamed_txn(relentry
, topxid
);
535 schema_sent
= relentry
->schema_sent
;
537 /* Nothing to do if we already sent the schema. */
542 * Nope, so send the schema. If the changes will be published using an
543 * ancestor's schema, not the relation's own, send that ancestor's schema
544 * before sending relation's own (XXX - maybe sending only the former
545 * suffices?). This is also a good place to set the map that will be used
546 * to convert the relation's tuples into the ancestor's format, if needed.
548 if (relentry
->publish_as_relid
!= RelationGetRelid(relation
))
550 Relation ancestor
= RelationIdGetRelation(relentry
->publish_as_relid
);
551 TupleDesc indesc
= RelationGetDescr(relation
);
552 TupleDesc outdesc
= RelationGetDescr(ancestor
);
553 MemoryContext oldctx
;
555 /* Map must live as long as the session does. */
556 oldctx
= MemoryContextSwitchTo(CacheMemoryContext
);
559 * Make copies of the TupleDescs that will live as long as the map
560 * does before putting into the map.
562 indesc
= CreateTupleDescCopy(indesc
);
563 outdesc
= CreateTupleDescCopy(outdesc
);
564 relentry
->map
= convert_tuples_by_name(indesc
, outdesc
);
565 if (relentry
->map
== NULL
)
567 /* Map not necessary, so free the TupleDescs too. */
568 FreeTupleDesc(indesc
);
569 FreeTupleDesc(outdesc
);
572 MemoryContextSwitchTo(oldctx
);
573 send_relation_and_attrs(ancestor
, xid
, ctx
);
574 RelationClose(ancestor
);
577 send_relation_and_attrs(relation
, xid
, ctx
);
580 set_schema_sent_in_streamed_txn(relentry
, topxid
);
582 relentry
->schema_sent
= true;
589 send_relation_and_attrs(Relation relation
, TransactionId xid
,
590 LogicalDecodingContext
*ctx
)
592 TupleDesc desc
= RelationGetDescr(relation
);
596 * Write out type info if needed. We do that only for user-created types.
597 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
598 * objects with hand-assigned OIDs to be "built in", not for instance any
599 * function or type defined in the information_schema. This is important
600 * because only hand-assigned OIDs can be expected to remain stable across
603 for (i
= 0; i
< desc
->natts
; i
++)
605 Form_pg_attribute att
= TupleDescAttr(desc
, i
);
607 if (att
->attisdropped
|| att
->attgenerated
)
610 if (att
->atttypid
< FirstGenbkiObjectId
)
613 OutputPluginPrepareWrite(ctx
, false);
614 logicalrep_write_typ(ctx
->out
, xid
, att
->atttypid
);
615 OutputPluginWrite(ctx
, false);
618 OutputPluginPrepareWrite(ctx
, false);
619 logicalrep_write_rel(ctx
->out
, xid
, relation
);
620 OutputPluginWrite(ctx
, false);
624 * Sends the decoded DML over wire.
626 * This is called both in streaming and non-streaming modes.
629 pgoutput_change(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
630 Relation relation
, ReorderBufferChange
*change
)
632 PGOutputData
*data
= (PGOutputData
*) ctx
->output_plugin_private
;
634 RelationSyncEntry
*relentry
;
635 TransactionId xid
= InvalidTransactionId
;
636 Relation ancestor
= NULL
;
638 if (!is_publishable_relation(relation
))
642 * Remember the xid for the change in streaming mode. We need to send xid
643 * with each change in the streaming mode so that subscriber can make
644 * their association and on aborts, it can discard the corresponding
648 xid
= change
->txn
->xid
;
650 relentry
= get_rel_sync_entry(data
, RelationGetRelid(relation
));
652 /* First check the table filter */
653 switch (change
->action
)
655 case REORDER_BUFFER_CHANGE_INSERT
:
656 if (!relentry
->pubactions
.pubinsert
)
659 case REORDER_BUFFER_CHANGE_UPDATE
:
660 if (!relentry
->pubactions
.pubupdate
)
663 case REORDER_BUFFER_CHANGE_DELETE
:
664 if (!relentry
->pubactions
.pubdelete
)
671 /* Avoid leaking memory by using and resetting our own context */
672 old
= MemoryContextSwitchTo(data
->context
);
674 maybe_send_schema(ctx
, change
, relation
, relentry
);
677 switch (change
->action
)
679 case REORDER_BUFFER_CHANGE_INSERT
:
681 HeapTuple tuple
= &change
->data
.tp
.newtuple
->tuple
;
683 /* Switch relation if publishing via root. */
684 if (relentry
->publish_as_relid
!= RelationGetRelid(relation
))
686 Assert(relation
->rd_rel
->relispartition
);
687 ancestor
= RelationIdGetRelation(relentry
->publish_as_relid
);
689 /* Convert tuple if needed. */
691 tuple
= execute_attr_map_tuple(tuple
, relentry
->map
);
694 OutputPluginPrepareWrite(ctx
, true);
695 logicalrep_write_insert(ctx
->out
, xid
, relation
, tuple
,
697 OutputPluginWrite(ctx
, true);
700 case REORDER_BUFFER_CHANGE_UPDATE
:
702 HeapTuple oldtuple
= change
->data
.tp
.oldtuple
?
703 &change
->data
.tp
.oldtuple
->tuple
: NULL
;
704 HeapTuple newtuple
= &change
->data
.tp
.newtuple
->tuple
;
706 /* Switch relation if publishing via root. */
707 if (relentry
->publish_as_relid
!= RelationGetRelid(relation
))
709 Assert(relation
->rd_rel
->relispartition
);
710 ancestor
= RelationIdGetRelation(relentry
->publish_as_relid
);
712 /* Convert tuples if needed. */
716 oldtuple
= execute_attr_map_tuple(oldtuple
,
718 newtuple
= execute_attr_map_tuple(newtuple
,
723 OutputPluginPrepareWrite(ctx
, true);
724 logicalrep_write_update(ctx
->out
, xid
, relation
, oldtuple
,
725 newtuple
, data
->binary
);
726 OutputPluginWrite(ctx
, true);
729 case REORDER_BUFFER_CHANGE_DELETE
:
730 if (change
->data
.tp
.oldtuple
)
732 HeapTuple oldtuple
= &change
->data
.tp
.oldtuple
->tuple
;
734 /* Switch relation if publishing via root. */
735 if (relentry
->publish_as_relid
!= RelationGetRelid(relation
))
737 Assert(relation
->rd_rel
->relispartition
);
738 ancestor
= RelationIdGetRelation(relentry
->publish_as_relid
);
740 /* Convert tuple if needed. */
742 oldtuple
= execute_attr_map_tuple(oldtuple
, relentry
->map
);
745 OutputPluginPrepareWrite(ctx
, true);
746 logicalrep_write_delete(ctx
->out
, xid
, relation
, oldtuple
,
748 OutputPluginWrite(ctx
, true);
751 elog(DEBUG1
, "didn't send DELETE change because of missing oldtuple");
757 if (RelationIsValid(ancestor
))
759 RelationClose(ancestor
);
764 MemoryContextSwitchTo(old
);
765 MemoryContextReset(data
->context
);
769 pgoutput_truncate(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
770 int nrelations
, Relation relations
[], ReorderBufferChange
*change
)
772 PGOutputData
*data
= (PGOutputData
*) ctx
->output_plugin_private
;
774 RelationSyncEntry
*relentry
;
778 TransactionId xid
= InvalidTransactionId
;
780 /* Remember the xid for the change in streaming mode. See pgoutput_change. */
782 xid
= change
->txn
->xid
;
784 old
= MemoryContextSwitchTo(data
->context
);
786 relids
= palloc0(nrelations
* sizeof(Oid
));
789 for (i
= 0; i
< nrelations
; i
++)
791 Relation relation
= relations
[i
];
792 Oid relid
= RelationGetRelid(relation
);
794 if (!is_publishable_relation(relation
))
797 relentry
= get_rel_sync_entry(data
, relid
);
799 if (!relentry
->pubactions
.pubtruncate
)
803 * Don't send partitions if the publication wants to send only the
804 * root tables through it.
806 if (relation
->rd_rel
->relispartition
&&
807 relentry
->publish_as_relid
!= relid
)
810 relids
[nrelids
++] = relid
;
811 maybe_send_schema(ctx
, change
, relation
, relentry
);
816 OutputPluginPrepareWrite(ctx
, true);
817 logicalrep_write_truncate(ctx
->out
,
821 change
->data
.truncate
.cascade
,
822 change
->data
.truncate
.restart_seqs
);
823 OutputPluginWrite(ctx
, true);
826 MemoryContextSwitchTo(old
);
827 MemoryContextReset(data
->context
);
831 pgoutput_message(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
832 XLogRecPtr message_lsn
, bool transactional
, const char *prefix
, Size sz
,
835 PGOutputData
*data
= (PGOutputData
*) ctx
->output_plugin_private
;
836 TransactionId xid
= InvalidTransactionId
;
842 * Remember the xid for the message in streaming mode. See
848 OutputPluginPrepareWrite(ctx
, true);
849 logicalrep_write_message(ctx
->out
,
856 OutputPluginWrite(ctx
, true);
860 * Currently we always forward.
863 pgoutput_origin_filter(LogicalDecodingContext
*ctx
,
864 RepOriginId origin_id
)
870 * Shutdown the output plugin.
872 * Note, we don't need to clean the data->context as it's child context
873 * of the ctx->context so it will be cleaned up by logical decoding machinery.
876 pgoutput_shutdown(LogicalDecodingContext
*ctx
)
878 if (RelationSyncCache
)
880 hash_destroy(RelationSyncCache
);
881 RelationSyncCache
= NULL
;
886 * Load publications from the list of publication names.
889 LoadPublications(List
*pubnames
)
894 foreach(lc
, pubnames
)
896 char *pubname
= (char *) lfirst(lc
);
897 Publication
*pub
= GetPublicationByName(pubname
, false);
899 result
= lappend(result
, pub
);
906 * Publication cache invalidation callback.
909 publication_invalidation_cb(Datum arg
, int cacheid
, uint32 hashvalue
)
911 publications_valid
= false;
914 * Also invalidate per-relation cache so that next time the filtering info
915 * is checked it will be updated with the new publication settings.
917 rel_sync_cache_publication_cb(arg
, cacheid
, hashvalue
);
921 * START STREAM callback
924 pgoutput_stream_start(struct LogicalDecodingContext
*ctx
,
925 ReorderBufferTXN
*txn
)
927 bool send_replication_origin
= txn
->origin_id
!= InvalidRepOriginId
;
929 /* we can't nest streaming of transactions */
930 Assert(!in_streaming
);
933 * If we already sent the first stream for this transaction then don't
934 * send the origin id in the subsequent streams.
936 if (rbtxn_is_streamed(txn
))
937 send_replication_origin
= false;
939 OutputPluginPrepareWrite(ctx
, !send_replication_origin
);
940 logicalrep_write_stream_start(ctx
->out
, txn
->xid
, !rbtxn_is_streamed(txn
));
942 send_repl_origin(ctx
, txn
->origin_id
, InvalidXLogRecPtr
,
943 send_replication_origin
);
945 OutputPluginWrite(ctx
, true);
947 /* we're streaming a chunk of transaction now */
952 * STOP STREAM callback
955 pgoutput_stream_stop(struct LogicalDecodingContext
*ctx
,
956 ReorderBufferTXN
*txn
)
958 /* we should be streaming a trasanction */
959 Assert(in_streaming
);
961 OutputPluginPrepareWrite(ctx
, true);
962 logicalrep_write_stream_stop(ctx
->out
);
963 OutputPluginWrite(ctx
, true);
965 /* we've stopped streaming a transaction */
966 in_streaming
= false;
970 * Notify downstream to discard the streamed transaction (along with all
971 * it's subtransactions, if it's a toplevel transaction).
974 pgoutput_stream_abort(struct LogicalDecodingContext
*ctx
,
975 ReorderBufferTXN
*txn
,
976 XLogRecPtr abort_lsn
)
978 ReorderBufferTXN
*toptxn
;
981 * The abort should happen outside streaming block, even for streamed
982 * transactions. The transaction has to be marked as streamed, though.
984 Assert(!in_streaming
);
986 /* determine the toplevel transaction */
987 toptxn
= (txn
->toptxn
) ? txn
->toptxn
: txn
;
989 Assert(rbtxn_is_streamed(toptxn
));
991 OutputPluginPrepareWrite(ctx
, true);
992 logicalrep_write_stream_abort(ctx
->out
, toptxn
->xid
, txn
->xid
);
993 OutputPluginWrite(ctx
, true);
995 cleanup_rel_sync_cache(toptxn
->xid
, false);
999 * Notify downstream to apply the streamed transaction (along with all
1000 * it's subtransactions).
1003 pgoutput_stream_commit(struct LogicalDecodingContext
*ctx
,
1004 ReorderBufferTXN
*txn
,
1005 XLogRecPtr commit_lsn
)
1008 * The commit should happen outside streaming block, even for streamed
1009 * transactions. The transaction has to be marked as streamed, though.
1011 Assert(!in_streaming
);
1012 Assert(rbtxn_is_streamed(txn
));
1014 OutputPluginUpdateProgress(ctx
);
1016 OutputPluginPrepareWrite(ctx
, true);
1017 logicalrep_write_stream_commit(ctx
->out
, txn
, commit_lsn
);
1018 OutputPluginWrite(ctx
, true);
1020 cleanup_rel_sync_cache(txn
->xid
, true);
1024 * PREPARE callback (for streaming two-phase commit).
1026 * Notify the downstream to prepare the transaction.
1029 pgoutput_stream_prepare_txn(LogicalDecodingContext
*ctx
,
1030 ReorderBufferTXN
*txn
,
1031 XLogRecPtr prepare_lsn
)
1033 Assert(rbtxn_is_streamed(txn
));
1035 OutputPluginUpdateProgress(ctx
);
1036 OutputPluginPrepareWrite(ctx
, true);
1037 logicalrep_write_stream_prepare(ctx
->out
, txn
, prepare_lsn
);
1038 OutputPluginWrite(ctx
, true);
1042 * Initialize the relation schema sync cache for a decoding session.
1044 * The hash table is destroyed at the end of a decoding session. While
1045 * relcache invalidations still exist and will still be invoked, they
1046 * will just see the null hash table global and take no action.
1049 init_rel_sync_cache(MemoryContext cachectx
)
1053 if (RelationSyncCache
!= NULL
)
1056 /* Make a new hash table for the cache */
1057 ctl
.keysize
= sizeof(Oid
);
1058 ctl
.entrysize
= sizeof(RelationSyncEntry
);
1059 ctl
.hcxt
= cachectx
;
1061 RelationSyncCache
= hash_create("logical replication output relation cache",
1063 HASH_ELEM
| HASH_CONTEXT
| HASH_BLOBS
);
1065 Assert(RelationSyncCache
!= NULL
);
1067 CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb
, (Datum
) 0);
1068 CacheRegisterSyscacheCallback(PUBLICATIONRELMAP
,
1069 rel_sync_cache_publication_cb
,
1071 CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP
,
1072 rel_sync_cache_publication_cb
,
1077 * We expect relatively small number of streamed transactions.
1080 get_schema_sent_in_streamed_txn(RelationSyncEntry
*entry
, TransactionId xid
)
1084 foreach(lc
, entry
->streamed_txns
)
1086 if (xid
== (uint32
) lfirst_int(lc
))
1094 * Add the xid in the rel sync entry for which we have already sent the schema
1098 set_schema_sent_in_streamed_txn(RelationSyncEntry
*entry
, TransactionId xid
)
1100 MemoryContext oldctx
;
1102 oldctx
= MemoryContextSwitchTo(CacheMemoryContext
);
1104 entry
->streamed_txns
= lappend_int(entry
->streamed_txns
, xid
);
1106 MemoryContextSwitchTo(oldctx
);
1110 * Find or create entry in the relation schema cache.
1112 * This looks up publications that the given relation is directly or
1113 * indirectly part of (the latter if it's really the relation's ancestor that
1114 * is part of a publication) and fills up the found entry with the information
1115 * about which operations to publish and whether to use an ancestor's schema
1118 static RelationSyncEntry
*
1119 get_rel_sync_entry(PGOutputData
*data
, Oid relid
)
1121 RelationSyncEntry
*entry
;
1123 MemoryContext oldctx
;
1125 Assert(RelationSyncCache
!= NULL
);
1127 /* Find cached relation info, creating if not found */
1128 entry
= (RelationSyncEntry
*) hash_search(RelationSyncCache
,
1130 HASH_ENTER
, &found
);
1131 Assert(entry
!= NULL
);
1133 /* Not found means schema wasn't sent */
1136 /* immediately make a new entry valid enough to satisfy callbacks */
1137 entry
->schema_sent
= false;
1138 entry
->streamed_txns
= NIL
;
1139 entry
->replicate_valid
= false;
1140 entry
->pubactions
.pubinsert
= entry
->pubactions
.pubupdate
=
1141 entry
->pubactions
.pubdelete
= entry
->pubactions
.pubtruncate
= false;
1142 entry
->publish_as_relid
= InvalidOid
;
1143 entry
->map
= NULL
; /* will be set by maybe_send_schema() if
1147 /* Validate the entry */
1148 if (!entry
->replicate_valid
)
1150 Oid schemaId
= get_rel_namespace(relid
);
1151 List
*pubids
= GetRelationPublications(relid
);
1154 * We don't acquire a lock on the namespace system table as we build
1155 * the cache entry using a historic snapshot and all the later changes
1156 * are absorbed while decoding WAL.
1158 List
*schemaPubids
= GetSchemaPublications(schemaId
);
1160 Oid publish_as_relid
= relid
;
1161 bool am_partition
= get_rel_relispartition(relid
);
1162 char relkind
= get_rel_relkind(relid
);
1164 /* Reload publications if needed before use. */
1165 if (!publications_valid
)
1167 oldctx
= MemoryContextSwitchTo(CacheMemoryContext
);
1168 if (data
->publications
)
1169 list_free_deep(data
->publications
);
1171 data
->publications
= LoadPublications(data
->publication_names
);
1172 MemoryContextSwitchTo(oldctx
);
1173 publications_valid
= true;
1177 * Build publication cache. We can't use one provided by relcache as
1178 * relcache considers all publications given relation is in, but here
1179 * we only need to consider ones that the subscriber requested.
1181 foreach(lc
, data
->publications
)
1183 Publication
*pub
= lfirst(lc
);
1184 bool publish
= false;
1189 if (pub
->pubviaroot
&& am_partition
)
1190 publish_as_relid
= llast_oid(get_partition_ancestors(relid
));
1195 bool ancestor_published
= false;
1198 * For a partition, check if any of the ancestors are
1199 * published. If so, note down the topmost ancestor that is
1200 * published via this publication, which will be used as the
1201 * relation via which to publish the partition's changes.
1205 List
*ancestors
= get_partition_ancestors(relid
);
1209 * Find the "topmost" ancestor that is in this
1212 foreach(lc2
, ancestors
)
1214 Oid ancestor
= lfirst_oid(lc2
);
1216 if (list_member_oid(GetRelationPublications(ancestor
),
1218 list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor
)),
1221 ancestor_published
= true;
1222 if (pub
->pubviaroot
)
1223 publish_as_relid
= ancestor
;
1228 if (list_member_oid(pubids
, pub
->oid
) ||
1229 list_member_oid(schemaPubids
, pub
->oid
) ||
1235 * Don't publish changes for partitioned tables, because
1236 * publishing those of its partitions suffices, unless partition
1237 * changes won't be published due to pubviaroot being set.
1240 (relkind
!= RELKIND_PARTITIONED_TABLE
|| pub
->pubviaroot
))
1242 entry
->pubactions
.pubinsert
|= pub
->pubactions
.pubinsert
;
1243 entry
->pubactions
.pubupdate
|= pub
->pubactions
.pubupdate
;
1244 entry
->pubactions
.pubdelete
|= pub
->pubactions
.pubdelete
;
1245 entry
->pubactions
.pubtruncate
|= pub
->pubactions
.pubtruncate
;
1248 if (entry
->pubactions
.pubinsert
&& entry
->pubactions
.pubupdate
&&
1249 entry
->pubactions
.pubdelete
&& entry
->pubactions
.pubtruncate
)
1255 entry
->publish_as_relid
= publish_as_relid
;
1256 entry
->replicate_valid
= true;
1263 * Cleanup list of streamed transactions and update the schema_sent flag.
1265 * When a streamed transaction commits or aborts, we need to remove the
1266 * toplevel XID from the schema cache. If the transaction aborted, the
1267 * subscriber will simply throw away the schema records we streamed, so
1268 * we don't need to do anything else.
1270 * If the transaction is committed, the subscriber will update the relation
1271 * cache - so tweak the schema_sent flag accordingly.
1274 cleanup_rel_sync_cache(TransactionId xid
, bool is_commit
)
1276 HASH_SEQ_STATUS hash_seq
;
1277 RelationSyncEntry
*entry
;
1280 Assert(RelationSyncCache
!= NULL
);
1282 hash_seq_init(&hash_seq
, RelationSyncCache
);
1283 while ((entry
= hash_seq_search(&hash_seq
)) != NULL
)
1286 * We can set the schema_sent flag for an entry that has committed xid
1287 * in the list as that ensures that the subscriber would have the
1288 * corresponding schema and we don't need to send it unless there is
1289 * any invalidation for that relation.
1291 foreach(lc
, entry
->streamed_txns
)
1293 if (xid
== (uint32
) lfirst_int(lc
))
1296 entry
->schema_sent
= true;
1298 entry
->streamed_txns
=
1299 foreach_delete_current(entry
->streamed_txns
, lc
);
1307 * Relcache invalidation callback
1310 rel_sync_cache_relation_cb(Datum arg
, Oid relid
)
1312 RelationSyncEntry
*entry
;
1315 * We can get here if the plugin was used in SQL interface as the
1316 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1317 * is no way to unregister the relcache invalidation callback.
1319 if (RelationSyncCache
== NULL
)
1323 * Nobody keeps pointers to entries in this hash table around outside
1324 * logical decoding callback calls - but invalidation events can come in
1325 * *during* a callback if we access the relcache in the callback. Because
1326 * of that we must mark the cache entry as invalid but not remove it from
1327 * the hash while it could still be referenced, then prune it at a later
1330 * Getting invalidations for relations that aren't in the table is
1331 * entirely normal, since there's no way to unregister for an invalidation
1332 * event. So we don't care if it's found or not.
1334 entry
= (RelationSyncEntry
*) hash_search(RelationSyncCache
, &relid
,
1338 * Reset schema sent status as the relation definition may have changed.
1339 * Also free any objects that depended on the earlier definition.
1343 entry
->schema_sent
= false;
1344 list_free(entry
->streamed_txns
);
1345 entry
->streamed_txns
= NIL
;
1349 * Must free the TupleDescs contained in the map explicitly,
1350 * because free_conversion_map() doesn't.
1352 FreeTupleDesc(entry
->map
->indesc
);
1353 FreeTupleDesc(entry
->map
->outdesc
);
1354 free_conversion_map(entry
->map
);
1361 * Publication relation/schema map syscache invalidation callback
1364 rel_sync_cache_publication_cb(Datum arg
, int cacheid
, uint32 hashvalue
)
1366 HASH_SEQ_STATUS status
;
1367 RelationSyncEntry
*entry
;
1370 * We can get here if the plugin was used in SQL interface as the
1371 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1372 * is no way to unregister the relcache invalidation callback.
1374 if (RelationSyncCache
== NULL
)
1378 * There is no way to find which entry in our cache the hash belongs to so
1379 * mark the whole cache as invalid.
1381 hash_seq_init(&status
, RelationSyncCache
);
1382 while ((entry
= (RelationSyncEntry
*) hash_seq_search(&status
)) != NULL
)
1384 entry
->replicate_valid
= false;
1387 * There might be some relations dropped from the publication so we
1388 * don't need to publish the changes for them.
1390 entry
->pubactions
.pubinsert
= false;
1391 entry
->pubactions
.pubupdate
= false;
1392 entry
->pubactions
.pubdelete
= false;
1393 entry
->pubactions
.pubtruncate
= false;
1397 /* Send Replication origin */
1399 send_repl_origin(LogicalDecodingContext
*ctx
, RepOriginId origin_id
,
1400 XLogRecPtr origin_lsn
, bool send_origin
)
1407 * XXX: which behaviour do we want here?
1410 * - don't send origin message if origin name not found
1411 * (that's what we do now)
1412 * - throw error - that will break replication, not good
1413 * - send some special "unknown" origin
1416 if (replorigin_by_oid(origin_id
, true, &origin
))
1418 /* Message boundary */
1419 OutputPluginWrite(ctx
, false);
1420 OutputPluginPrepareWrite(ctx
, true);
1422 logicalrep_write_origin(ctx
->out
, origin
, origin_lsn
);