Update copyright for 2022
[pgsql.git] / src / backend / replication / pgoutput / pgoutput.c
blobaf8d51aee990dbbfdce859278b41bc5f919fed59
1 /*-------------------------------------------------------------------------
3 * pgoutput.c
4 * Logical Replication output plugin
6 * Copyright (c) 2012-2022, PostgreSQL Global Development Group
8 * IDENTIFICATION
9 * src/backend/replication/pgoutput/pgoutput.c
11 *-------------------------------------------------------------------------
13 #include "postgres.h"
15 #include "access/tupconvert.h"
16 #include "catalog/partition.h"
17 #include "catalog/pg_publication.h"
18 #include "commands/defrem.h"
19 #include "fmgr.h"
20 #include "replication/logical.h"
21 #include "replication/logicalproto.h"
22 #include "replication/origin.h"
23 #include "replication/pgoutput.h"
24 #include "utils/int8.h"
25 #include "utils/inval.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/syscache.h"
29 #include "utils/varlena.h"
31 PG_MODULE_MAGIC;
33 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
35 static void pgoutput_startup(LogicalDecodingContext *ctx,
36 OutputPluginOptions *opt, bool is_init);
37 static void pgoutput_shutdown(LogicalDecodingContext *ctx);
38 static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
39 ReorderBufferTXN *txn);
40 static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
41 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
42 static void pgoutput_change(LogicalDecodingContext *ctx,
43 ReorderBufferTXN *txn, Relation rel,
44 ReorderBufferChange *change);
45 static void pgoutput_truncate(LogicalDecodingContext *ctx,
46 ReorderBufferTXN *txn, int nrelations, Relation relations[],
47 ReorderBufferChange *change);
48 static void pgoutput_message(LogicalDecodingContext *ctx,
49 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
50 bool transactional, const char *prefix,
51 Size sz, const char *message);
52 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
53 RepOriginId origin_id);
54 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
55 ReorderBufferTXN *txn);
56 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
57 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
58 static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
59 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
60 static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
61 ReorderBufferTXN *txn,
62 XLogRecPtr prepare_end_lsn,
63 TimestampTz prepare_time);
64 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
65 ReorderBufferTXN *txn);
66 static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
67 ReorderBufferTXN *txn);
68 static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
69 ReorderBufferTXN *txn,
70 XLogRecPtr abort_lsn);
71 static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
72 ReorderBufferTXN *txn,
73 XLogRecPtr commit_lsn);
74 static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
75 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
77 static bool publications_valid;
78 static bool in_streaming;
80 static List *LoadPublications(List *pubnames);
81 static void publication_invalidation_cb(Datum arg, int cacheid,
82 uint32 hashvalue);
83 static void send_relation_and_attrs(Relation relation, TransactionId xid,
84 LogicalDecodingContext *ctx);
85 static void send_repl_origin(LogicalDecodingContext *ctx,
86 RepOriginId origin_id, XLogRecPtr origin_lsn,
87 bool send_origin);
90 * Entry in the map used to remember which relation schemas we sent.
92 * The schema_sent flag determines if the current schema record for the
93 * relation (and for its ancestor if publish_as_relid is set) was already
94 * sent to the subscriber (in which case we don't need to send it again).
96 * The schema cache on downstream is however updated only at commit time,
97 * and with streamed transactions the commit order may be different from
98 * the order the transactions are sent in. Also, the (sub) transactions
99 * might get aborted so we need to send the schema for each (sub) transaction
100 * so that we don't lose the schema information on abort. For handling this,
101 * we maintain the list of xids (streamed_txns) for those we have already sent
102 * the schema.
104 * For partitions, 'pubactions' considers not only the table's own
105 * publications, but also those of all of its ancestors.
107 typedef struct RelationSyncEntry
109 Oid relid; /* relation oid */
111 bool schema_sent;
112 List *streamed_txns; /* streamed toplevel transactions with this
113 * schema */
115 bool replicate_valid;
116 PublicationActions pubactions;
119 * OID of the relation to publish changes as. For a partition, this may
120 * be set to one of its ancestors whose schema will be used when
121 * replicating changes, if publish_via_partition_root is set for the
122 * publication.
124 Oid publish_as_relid;
127 * Map used when replicating using an ancestor's schema to convert tuples
128 * from partition's type to the ancestor's; NULL if publish_as_relid is
129 * same as 'relid' or if unnecessary due to partition and the ancestor
130 * having identical TupleDesc.
132 TupleConversionMap *map;
133 } RelationSyncEntry;
135 /* Map used to remember which relation schemas we sent. */
136 static HTAB *RelationSyncCache = NULL;
138 static void init_rel_sync_cache(MemoryContext decoding_context);
139 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
140 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
141 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
142 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
143 uint32 hashvalue);
144 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
145 TransactionId xid);
146 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
147 TransactionId xid);
150 * Specify output plugin callbacks
152 void
153 _PG_output_plugin_init(OutputPluginCallbacks *cb)
155 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
157 cb->startup_cb = pgoutput_startup;
158 cb->begin_cb = pgoutput_begin_txn;
159 cb->change_cb = pgoutput_change;
160 cb->truncate_cb = pgoutput_truncate;
161 cb->message_cb = pgoutput_message;
162 cb->commit_cb = pgoutput_commit_txn;
164 cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
165 cb->prepare_cb = pgoutput_prepare_txn;
166 cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
167 cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
168 cb->filter_by_origin_cb = pgoutput_origin_filter;
169 cb->shutdown_cb = pgoutput_shutdown;
171 /* transaction streaming */
172 cb->stream_start_cb = pgoutput_stream_start;
173 cb->stream_stop_cb = pgoutput_stream_stop;
174 cb->stream_abort_cb = pgoutput_stream_abort;
175 cb->stream_commit_cb = pgoutput_stream_commit;
176 cb->stream_change_cb = pgoutput_change;
177 cb->stream_message_cb = pgoutput_message;
178 cb->stream_truncate_cb = pgoutput_truncate;
179 /* transaction streaming - two-phase commit */
180 cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
183 static void
184 parse_output_parameters(List *options, PGOutputData *data)
186 ListCell *lc;
187 bool protocol_version_given = false;
188 bool publication_names_given = false;
189 bool binary_option_given = false;
190 bool messages_option_given = false;
191 bool streaming_given = false;
192 bool two_phase_option_given = false;
194 data->binary = false;
195 data->streaming = false;
196 data->messages = false;
197 data->two_phase = false;
199 foreach(lc, options)
201 DefElem *defel = (DefElem *) lfirst(lc);
203 Assert(defel->arg == NULL || IsA(defel->arg, String));
205 /* Check each param, whether or not we recognize it */
206 if (strcmp(defel->defname, "proto_version") == 0)
208 int64 parsed;
210 if (protocol_version_given)
211 ereport(ERROR,
212 (errcode(ERRCODE_SYNTAX_ERROR),
213 errmsg("conflicting or redundant options")));
214 protocol_version_given = true;
216 if (!scanint8(strVal(defel->arg), true, &parsed))
217 ereport(ERROR,
218 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
219 errmsg("invalid proto_version")));
221 if (parsed > PG_UINT32_MAX || parsed < 0)
222 ereport(ERROR,
223 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
224 errmsg("proto_version \"%s\" out of range",
225 strVal(defel->arg))));
227 data->protocol_version = (uint32) parsed;
229 else if (strcmp(defel->defname, "publication_names") == 0)
231 if (publication_names_given)
232 ereport(ERROR,
233 (errcode(ERRCODE_SYNTAX_ERROR),
234 errmsg("conflicting or redundant options")));
235 publication_names_given = true;
237 if (!SplitIdentifierString(strVal(defel->arg), ',',
238 &data->publication_names))
239 ereport(ERROR,
240 (errcode(ERRCODE_INVALID_NAME),
241 errmsg("invalid publication_names syntax")));
243 else if (strcmp(defel->defname, "binary") == 0)
245 if (binary_option_given)
246 ereport(ERROR,
247 (errcode(ERRCODE_SYNTAX_ERROR),
248 errmsg("conflicting or redundant options")));
249 binary_option_given = true;
251 data->binary = defGetBoolean(defel);
253 else if (strcmp(defel->defname, "messages") == 0)
255 if (messages_option_given)
256 ereport(ERROR,
257 (errcode(ERRCODE_SYNTAX_ERROR),
258 errmsg("conflicting or redundant options")));
259 messages_option_given = true;
261 data->messages = defGetBoolean(defel);
263 else if (strcmp(defel->defname, "streaming") == 0)
265 if (streaming_given)
266 ereport(ERROR,
267 (errcode(ERRCODE_SYNTAX_ERROR),
268 errmsg("conflicting or redundant options")));
269 streaming_given = true;
271 data->streaming = defGetBoolean(defel);
273 else if (strcmp(defel->defname, "two_phase") == 0)
275 if (two_phase_option_given)
276 ereport(ERROR,
277 (errcode(ERRCODE_SYNTAX_ERROR),
278 errmsg("conflicting or redundant options")));
279 two_phase_option_given = true;
281 data->two_phase = defGetBoolean(defel);
283 else
284 elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
289 * Initialize this plugin
291 static void
292 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
293 bool is_init)
295 PGOutputData *data = palloc0(sizeof(PGOutputData));
297 /* Create our memory context for private allocations. */
298 data->context = AllocSetContextCreate(ctx->context,
299 "logical replication output context",
300 ALLOCSET_DEFAULT_SIZES);
302 ctx->output_plugin_private = data;
304 /* This plugin uses binary protocol. */
305 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
308 * This is replication start and not slot initialization.
310 * Parse and validate options passed by the client.
312 if (!is_init)
314 /* Parse the params and ERROR if we see any we don't recognize */
315 parse_output_parameters(ctx->output_plugin_options, data);
317 /* Check if we support requested protocol */
318 if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
319 ereport(ERROR,
320 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
321 errmsg("client sent proto_version=%d but we only support protocol %d or lower",
322 data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
324 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
325 ereport(ERROR,
326 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
327 errmsg("client sent proto_version=%d but we only support protocol %d or higher",
328 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
330 if (list_length(data->publication_names) < 1)
331 ereport(ERROR,
332 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
333 errmsg("publication_names parameter missing")));
336 * Decide whether to enable streaming. It is disabled by default, in
337 * which case we just update the flag in decoding context. Otherwise
338 * we only allow it with sufficient version of the protocol, and when
339 * the output plugin supports it.
341 if (!data->streaming)
342 ctx->streaming = false;
343 else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
344 ereport(ERROR,
345 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
346 errmsg("requested proto_version=%d does not support streaming, need %d or higher",
347 data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
348 else if (!ctx->streaming)
349 ereport(ERROR,
350 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
351 errmsg("streaming requested, but not supported by output plugin")));
353 /* Also remember we're currently not streaming any transaction. */
354 in_streaming = false;
357 * Here, we just check whether the two-phase option is passed by
358 * plugin and decide whether to enable it at later point of time. It
359 * remains enabled if the previous start-up has done so. But we only
360 * allow the option to be passed in with sufficient version of the
361 * protocol, and when the output plugin supports it.
363 if (!data->two_phase)
364 ctx->twophase_opt_given = false;
365 else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
366 ereport(ERROR,
367 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
368 errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
369 data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
370 else if (!ctx->twophase)
371 ereport(ERROR,
372 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
373 errmsg("two-phase commit requested, but not supported by output plugin")));
374 else
375 ctx->twophase_opt_given = true;
377 /* Init publication state. */
378 data->publications = NIL;
379 publications_valid = false;
380 CacheRegisterSyscacheCallback(PUBLICATIONOID,
381 publication_invalidation_cb,
382 (Datum) 0);
384 /* Initialize relation schema cache. */
385 init_rel_sync_cache(CacheMemoryContext);
387 else
390 * Disable the streaming and prepared transactions during the slot
391 * initialization mode.
393 ctx->streaming = false;
394 ctx->twophase = false;
399 * BEGIN callback
401 static void
402 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
404 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
406 OutputPluginPrepareWrite(ctx, !send_replication_origin);
407 logicalrep_write_begin(ctx->out, txn);
409 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
410 send_replication_origin);
412 OutputPluginWrite(ctx, true);
416 * COMMIT callback
418 static void
419 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
420 XLogRecPtr commit_lsn)
422 OutputPluginUpdateProgress(ctx);
424 OutputPluginPrepareWrite(ctx, true);
425 logicalrep_write_commit(ctx->out, txn, commit_lsn);
426 OutputPluginWrite(ctx, true);
430 * BEGIN PREPARE callback
432 static void
433 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
435 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
437 OutputPluginPrepareWrite(ctx, !send_replication_origin);
438 logicalrep_write_begin_prepare(ctx->out, txn);
440 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
441 send_replication_origin);
443 OutputPluginWrite(ctx, true);
447 * PREPARE callback
449 static void
450 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
451 XLogRecPtr prepare_lsn)
453 OutputPluginUpdateProgress(ctx);
455 OutputPluginPrepareWrite(ctx, true);
456 logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
457 OutputPluginWrite(ctx, true);
461 * COMMIT PREPARED callback
463 static void
464 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
465 XLogRecPtr commit_lsn)
467 OutputPluginUpdateProgress(ctx);
469 OutputPluginPrepareWrite(ctx, true);
470 logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
471 OutputPluginWrite(ctx, true);
475 * ROLLBACK PREPARED callback
477 static void
478 pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
479 ReorderBufferTXN *txn,
480 XLogRecPtr prepare_end_lsn,
481 TimestampTz prepare_time)
483 OutputPluginUpdateProgress(ctx);
485 OutputPluginPrepareWrite(ctx, true);
486 logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
487 prepare_time);
488 OutputPluginWrite(ctx, true);
492 * Write the current schema of the relation and its ancestor (if any) if not
493 * done yet.
495 static void
496 maybe_send_schema(LogicalDecodingContext *ctx,
497 ReorderBufferChange *change,
498 Relation relation, RelationSyncEntry *relentry)
500 bool schema_sent;
501 TransactionId xid = InvalidTransactionId;
502 TransactionId topxid = InvalidTransactionId;
505 * Remember XID of the (sub)transaction for the change. We don't care if
506 * it's top-level transaction or not (we have already sent that XID in
507 * start of the current streaming block).
509 * If we're not in a streaming block, just use InvalidTransactionId and
510 * the write methods will not include it.
512 if (in_streaming)
513 xid = change->txn->xid;
515 if (change->txn->toptxn)
516 topxid = change->txn->toptxn->xid;
517 else
518 topxid = xid;
521 * Do we need to send the schema? We do track streamed transactions
522 * separately, because those may be applied later (and the regular
523 * transactions won't see their effects until then) and in an order that
524 * we don't know at this point.
526 * XXX There is a scope of optimization here. Currently, we always send
527 * the schema first time in a streaming transaction but we can probably
528 * avoid that by checking 'relentry->schema_sent' flag. However, before
529 * doing that we need to study its impact on the case where we have a mix
530 * of streaming and non-streaming transactions.
532 if (in_streaming)
533 schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
534 else
535 schema_sent = relentry->schema_sent;
537 /* Nothing to do if we already sent the schema. */
538 if (schema_sent)
539 return;
542 * Nope, so send the schema. If the changes will be published using an
543 * ancestor's schema, not the relation's own, send that ancestor's schema
544 * before sending relation's own (XXX - maybe sending only the former
545 * suffices?). This is also a good place to set the map that will be used
546 * to convert the relation's tuples into the ancestor's format, if needed.
548 if (relentry->publish_as_relid != RelationGetRelid(relation))
550 Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
551 TupleDesc indesc = RelationGetDescr(relation);
552 TupleDesc outdesc = RelationGetDescr(ancestor);
553 MemoryContext oldctx;
555 /* Map must live as long as the session does. */
556 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
559 * Make copies of the TupleDescs that will live as long as the map
560 * does before putting into the map.
562 indesc = CreateTupleDescCopy(indesc);
563 outdesc = CreateTupleDescCopy(outdesc);
564 relentry->map = convert_tuples_by_name(indesc, outdesc);
565 if (relentry->map == NULL)
567 /* Map not necessary, so free the TupleDescs too. */
568 FreeTupleDesc(indesc);
569 FreeTupleDesc(outdesc);
572 MemoryContextSwitchTo(oldctx);
573 send_relation_and_attrs(ancestor, xid, ctx);
574 RelationClose(ancestor);
577 send_relation_and_attrs(relation, xid, ctx);
579 if (in_streaming)
580 set_schema_sent_in_streamed_txn(relentry, topxid);
581 else
582 relentry->schema_sent = true;
586 * Sends a relation
588 static void
589 send_relation_and_attrs(Relation relation, TransactionId xid,
590 LogicalDecodingContext *ctx)
592 TupleDesc desc = RelationGetDescr(relation);
593 int i;
596 * Write out type info if needed. We do that only for user-created types.
597 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
598 * objects with hand-assigned OIDs to be "built in", not for instance any
599 * function or type defined in the information_schema. This is important
600 * because only hand-assigned OIDs can be expected to remain stable across
601 * major versions.
603 for (i = 0; i < desc->natts; i++)
605 Form_pg_attribute att = TupleDescAttr(desc, i);
607 if (att->attisdropped || att->attgenerated)
608 continue;
610 if (att->atttypid < FirstGenbkiObjectId)
611 continue;
613 OutputPluginPrepareWrite(ctx, false);
614 logicalrep_write_typ(ctx->out, xid, att->atttypid);
615 OutputPluginWrite(ctx, false);
618 OutputPluginPrepareWrite(ctx, false);
619 logicalrep_write_rel(ctx->out, xid, relation);
620 OutputPluginWrite(ctx, false);
624 * Sends the decoded DML over wire.
626 * This is called both in streaming and non-streaming modes.
628 static void
629 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
630 Relation relation, ReorderBufferChange *change)
632 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
633 MemoryContext old;
634 RelationSyncEntry *relentry;
635 TransactionId xid = InvalidTransactionId;
636 Relation ancestor = NULL;
638 if (!is_publishable_relation(relation))
639 return;
642 * Remember the xid for the change in streaming mode. We need to send xid
643 * with each change in the streaming mode so that subscriber can make
644 * their association and on aborts, it can discard the corresponding
645 * changes.
647 if (in_streaming)
648 xid = change->txn->xid;
650 relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
652 /* First check the table filter */
653 switch (change->action)
655 case REORDER_BUFFER_CHANGE_INSERT:
656 if (!relentry->pubactions.pubinsert)
657 return;
658 break;
659 case REORDER_BUFFER_CHANGE_UPDATE:
660 if (!relentry->pubactions.pubupdate)
661 return;
662 break;
663 case REORDER_BUFFER_CHANGE_DELETE:
664 if (!relentry->pubactions.pubdelete)
665 return;
666 break;
667 default:
668 Assert(false);
671 /* Avoid leaking memory by using and resetting our own context */
672 old = MemoryContextSwitchTo(data->context);
674 maybe_send_schema(ctx, change, relation, relentry);
676 /* Send the data */
677 switch (change->action)
679 case REORDER_BUFFER_CHANGE_INSERT:
681 HeapTuple tuple = &change->data.tp.newtuple->tuple;
683 /* Switch relation if publishing via root. */
684 if (relentry->publish_as_relid != RelationGetRelid(relation))
686 Assert(relation->rd_rel->relispartition);
687 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
688 relation = ancestor;
689 /* Convert tuple if needed. */
690 if (relentry->map)
691 tuple = execute_attr_map_tuple(tuple, relentry->map);
694 OutputPluginPrepareWrite(ctx, true);
695 logicalrep_write_insert(ctx->out, xid, relation, tuple,
696 data->binary);
697 OutputPluginWrite(ctx, true);
698 break;
700 case REORDER_BUFFER_CHANGE_UPDATE:
702 HeapTuple oldtuple = change->data.tp.oldtuple ?
703 &change->data.tp.oldtuple->tuple : NULL;
704 HeapTuple newtuple = &change->data.tp.newtuple->tuple;
706 /* Switch relation if publishing via root. */
707 if (relentry->publish_as_relid != RelationGetRelid(relation))
709 Assert(relation->rd_rel->relispartition);
710 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
711 relation = ancestor;
712 /* Convert tuples if needed. */
713 if (relentry->map)
715 if (oldtuple)
716 oldtuple = execute_attr_map_tuple(oldtuple,
717 relentry->map);
718 newtuple = execute_attr_map_tuple(newtuple,
719 relentry->map);
723 OutputPluginPrepareWrite(ctx, true);
724 logicalrep_write_update(ctx->out, xid, relation, oldtuple,
725 newtuple, data->binary);
726 OutputPluginWrite(ctx, true);
727 break;
729 case REORDER_BUFFER_CHANGE_DELETE:
730 if (change->data.tp.oldtuple)
732 HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
734 /* Switch relation if publishing via root. */
735 if (relentry->publish_as_relid != RelationGetRelid(relation))
737 Assert(relation->rd_rel->relispartition);
738 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
739 relation = ancestor;
740 /* Convert tuple if needed. */
741 if (relentry->map)
742 oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
745 OutputPluginPrepareWrite(ctx, true);
746 logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
747 data->binary);
748 OutputPluginWrite(ctx, true);
750 else
751 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
752 break;
753 default:
754 Assert(false);
757 if (RelationIsValid(ancestor))
759 RelationClose(ancestor);
760 ancestor = NULL;
763 /* Cleanup */
764 MemoryContextSwitchTo(old);
765 MemoryContextReset(data->context);
768 static void
769 pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
770 int nrelations, Relation relations[], ReorderBufferChange *change)
772 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
773 MemoryContext old;
774 RelationSyncEntry *relentry;
775 int i;
776 int nrelids;
777 Oid *relids;
778 TransactionId xid = InvalidTransactionId;
780 /* Remember the xid for the change in streaming mode. See pgoutput_change. */
781 if (in_streaming)
782 xid = change->txn->xid;
784 old = MemoryContextSwitchTo(data->context);
786 relids = palloc0(nrelations * sizeof(Oid));
787 nrelids = 0;
789 for (i = 0; i < nrelations; i++)
791 Relation relation = relations[i];
792 Oid relid = RelationGetRelid(relation);
794 if (!is_publishable_relation(relation))
795 continue;
797 relentry = get_rel_sync_entry(data, relid);
799 if (!relentry->pubactions.pubtruncate)
800 continue;
803 * Don't send partitions if the publication wants to send only the
804 * root tables through it.
806 if (relation->rd_rel->relispartition &&
807 relentry->publish_as_relid != relid)
808 continue;
810 relids[nrelids++] = relid;
811 maybe_send_schema(ctx, change, relation, relentry);
814 if (nrelids > 0)
816 OutputPluginPrepareWrite(ctx, true);
817 logicalrep_write_truncate(ctx->out,
818 xid,
819 nrelids,
820 relids,
821 change->data.truncate.cascade,
822 change->data.truncate.restart_seqs);
823 OutputPluginWrite(ctx, true);
826 MemoryContextSwitchTo(old);
827 MemoryContextReset(data->context);
830 static void
831 pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
832 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
833 const char *message)
835 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
836 TransactionId xid = InvalidTransactionId;
838 if (!data->messages)
839 return;
842 * Remember the xid for the message in streaming mode. See
843 * pgoutput_change.
845 if (in_streaming)
846 xid = txn->xid;
848 OutputPluginPrepareWrite(ctx, true);
849 logicalrep_write_message(ctx->out,
850 xid,
851 message_lsn,
852 transactional,
853 prefix,
855 message);
856 OutputPluginWrite(ctx, true);
860 * Currently we always forward.
862 static bool
863 pgoutput_origin_filter(LogicalDecodingContext *ctx,
864 RepOriginId origin_id)
866 return false;
870 * Shutdown the output plugin.
872 * Note, we don't need to clean the data->context as it's child context
873 * of the ctx->context so it will be cleaned up by logical decoding machinery.
875 static void
876 pgoutput_shutdown(LogicalDecodingContext *ctx)
878 if (RelationSyncCache)
880 hash_destroy(RelationSyncCache);
881 RelationSyncCache = NULL;
886 * Load publications from the list of publication names.
888 static List *
889 LoadPublications(List *pubnames)
891 List *result = NIL;
892 ListCell *lc;
894 foreach(lc, pubnames)
896 char *pubname = (char *) lfirst(lc);
897 Publication *pub = GetPublicationByName(pubname, false);
899 result = lappend(result, pub);
902 return result;
906 * Publication cache invalidation callback.
908 static void
909 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
911 publications_valid = false;
914 * Also invalidate per-relation cache so that next time the filtering info
915 * is checked it will be updated with the new publication settings.
917 rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
921 * START STREAM callback
923 static void
924 pgoutput_stream_start(struct LogicalDecodingContext *ctx,
925 ReorderBufferTXN *txn)
927 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
929 /* we can't nest streaming of transactions */
930 Assert(!in_streaming);
933 * If we already sent the first stream for this transaction then don't
934 * send the origin id in the subsequent streams.
936 if (rbtxn_is_streamed(txn))
937 send_replication_origin = false;
939 OutputPluginPrepareWrite(ctx, !send_replication_origin);
940 logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
942 send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
943 send_replication_origin);
945 OutputPluginWrite(ctx, true);
947 /* we're streaming a chunk of transaction now */
948 in_streaming = true;
952 * STOP STREAM callback
954 static void
955 pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
956 ReorderBufferTXN *txn)
958 /* we should be streaming a trasanction */
959 Assert(in_streaming);
961 OutputPluginPrepareWrite(ctx, true);
962 logicalrep_write_stream_stop(ctx->out);
963 OutputPluginWrite(ctx, true);
965 /* we've stopped streaming a transaction */
966 in_streaming = false;
970 * Notify downstream to discard the streamed transaction (along with all
971 * it's subtransactions, if it's a toplevel transaction).
973 static void
974 pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
975 ReorderBufferTXN *txn,
976 XLogRecPtr abort_lsn)
978 ReorderBufferTXN *toptxn;
981 * The abort should happen outside streaming block, even for streamed
982 * transactions. The transaction has to be marked as streamed, though.
984 Assert(!in_streaming);
986 /* determine the toplevel transaction */
987 toptxn = (txn->toptxn) ? txn->toptxn : txn;
989 Assert(rbtxn_is_streamed(toptxn));
991 OutputPluginPrepareWrite(ctx, true);
992 logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
993 OutputPluginWrite(ctx, true);
995 cleanup_rel_sync_cache(toptxn->xid, false);
999 * Notify downstream to apply the streamed transaction (along with all
1000 * it's subtransactions).
1002 static void
1003 pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1004 ReorderBufferTXN *txn,
1005 XLogRecPtr commit_lsn)
1008 * The commit should happen outside streaming block, even for streamed
1009 * transactions. The transaction has to be marked as streamed, though.
1011 Assert(!in_streaming);
1012 Assert(rbtxn_is_streamed(txn));
1014 OutputPluginUpdateProgress(ctx);
1016 OutputPluginPrepareWrite(ctx, true);
1017 logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1018 OutputPluginWrite(ctx, true);
1020 cleanup_rel_sync_cache(txn->xid, true);
1024 * PREPARE callback (for streaming two-phase commit).
1026 * Notify the downstream to prepare the transaction.
1028 static void
1029 pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1030 ReorderBufferTXN *txn,
1031 XLogRecPtr prepare_lsn)
1033 Assert(rbtxn_is_streamed(txn));
1035 OutputPluginUpdateProgress(ctx);
1036 OutputPluginPrepareWrite(ctx, true);
1037 logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1038 OutputPluginWrite(ctx, true);
1042 * Initialize the relation schema sync cache for a decoding session.
1044 * The hash table is destroyed at the end of a decoding session. While
1045 * relcache invalidations still exist and will still be invoked, they
1046 * will just see the null hash table global and take no action.
1048 static void
1049 init_rel_sync_cache(MemoryContext cachectx)
1051 HASHCTL ctl;
1053 if (RelationSyncCache != NULL)
1054 return;
1056 /* Make a new hash table for the cache */
1057 ctl.keysize = sizeof(Oid);
1058 ctl.entrysize = sizeof(RelationSyncEntry);
1059 ctl.hcxt = cachectx;
1061 RelationSyncCache = hash_create("logical replication output relation cache",
1062 128, &ctl,
1063 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1065 Assert(RelationSyncCache != NULL);
1067 CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1068 CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1069 rel_sync_cache_publication_cb,
1070 (Datum) 0);
1071 CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1072 rel_sync_cache_publication_cb,
1073 (Datum) 0);
1077 * We expect relatively small number of streamed transactions.
1079 static bool
1080 get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1082 ListCell *lc;
1084 foreach(lc, entry->streamed_txns)
1086 if (xid == (uint32) lfirst_int(lc))
1087 return true;
1090 return false;
1094 * Add the xid in the rel sync entry for which we have already sent the schema
1095 * of the relation.
1097 static void
1098 set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1100 MemoryContext oldctx;
1102 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1104 entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
1106 MemoryContextSwitchTo(oldctx);
1110 * Find or create entry in the relation schema cache.
1112 * This looks up publications that the given relation is directly or
1113 * indirectly part of (the latter if it's really the relation's ancestor that
1114 * is part of a publication) and fills up the found entry with the information
1115 * about which operations to publish and whether to use an ancestor's schema
1116 * when publishing.
1118 static RelationSyncEntry *
1119 get_rel_sync_entry(PGOutputData *data, Oid relid)
1121 RelationSyncEntry *entry;
1122 bool found;
1123 MemoryContext oldctx;
1125 Assert(RelationSyncCache != NULL);
1127 /* Find cached relation info, creating if not found */
1128 entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
1129 (void *) &relid,
1130 HASH_ENTER, &found);
1131 Assert(entry != NULL);
1133 /* Not found means schema wasn't sent */
1134 if (!found)
1136 /* immediately make a new entry valid enough to satisfy callbacks */
1137 entry->schema_sent = false;
1138 entry->streamed_txns = NIL;
1139 entry->replicate_valid = false;
1140 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1141 entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1142 entry->publish_as_relid = InvalidOid;
1143 entry->map = NULL; /* will be set by maybe_send_schema() if
1144 * needed */
1147 /* Validate the entry */
1148 if (!entry->replicate_valid)
1150 Oid schemaId = get_rel_namespace(relid);
1151 List *pubids = GetRelationPublications(relid);
1154 * We don't acquire a lock on the namespace system table as we build
1155 * the cache entry using a historic snapshot and all the later changes
1156 * are absorbed while decoding WAL.
1158 List *schemaPubids = GetSchemaPublications(schemaId);
1159 ListCell *lc;
1160 Oid publish_as_relid = relid;
1161 bool am_partition = get_rel_relispartition(relid);
1162 char relkind = get_rel_relkind(relid);
1164 /* Reload publications if needed before use. */
1165 if (!publications_valid)
1167 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1168 if (data->publications)
1169 list_free_deep(data->publications);
1171 data->publications = LoadPublications(data->publication_names);
1172 MemoryContextSwitchTo(oldctx);
1173 publications_valid = true;
1177 * Build publication cache. We can't use one provided by relcache as
1178 * relcache considers all publications given relation is in, but here
1179 * we only need to consider ones that the subscriber requested.
1181 foreach(lc, data->publications)
1183 Publication *pub = lfirst(lc);
1184 bool publish = false;
1186 if (pub->alltables)
1188 publish = true;
1189 if (pub->pubviaroot && am_partition)
1190 publish_as_relid = llast_oid(get_partition_ancestors(relid));
1193 if (!publish)
1195 bool ancestor_published = false;
1198 * For a partition, check if any of the ancestors are
1199 * published. If so, note down the topmost ancestor that is
1200 * published via this publication, which will be used as the
1201 * relation via which to publish the partition's changes.
1203 if (am_partition)
1205 List *ancestors = get_partition_ancestors(relid);
1206 ListCell *lc2;
1209 * Find the "topmost" ancestor that is in this
1210 * publication.
1212 foreach(lc2, ancestors)
1214 Oid ancestor = lfirst_oid(lc2);
1216 if (list_member_oid(GetRelationPublications(ancestor),
1217 pub->oid) ||
1218 list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)),
1219 pub->oid))
1221 ancestor_published = true;
1222 if (pub->pubviaroot)
1223 publish_as_relid = ancestor;
1228 if (list_member_oid(pubids, pub->oid) ||
1229 list_member_oid(schemaPubids, pub->oid) ||
1230 ancestor_published)
1231 publish = true;
1235 * Don't publish changes for partitioned tables, because
1236 * publishing those of its partitions suffices, unless partition
1237 * changes won't be published due to pubviaroot being set.
1239 if (publish &&
1240 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
1242 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
1243 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
1244 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
1245 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
1248 if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
1249 entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
1250 break;
1253 list_free(pubids);
1255 entry->publish_as_relid = publish_as_relid;
1256 entry->replicate_valid = true;
1259 return entry;
1263 * Cleanup list of streamed transactions and update the schema_sent flag.
1265 * When a streamed transaction commits or aborts, we need to remove the
1266 * toplevel XID from the schema cache. If the transaction aborted, the
1267 * subscriber will simply throw away the schema records we streamed, so
1268 * we don't need to do anything else.
1270 * If the transaction is committed, the subscriber will update the relation
1271 * cache - so tweak the schema_sent flag accordingly.
1273 static void
1274 cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
1276 HASH_SEQ_STATUS hash_seq;
1277 RelationSyncEntry *entry;
1278 ListCell *lc;
1280 Assert(RelationSyncCache != NULL);
1282 hash_seq_init(&hash_seq, RelationSyncCache);
1283 while ((entry = hash_seq_search(&hash_seq)) != NULL)
1286 * We can set the schema_sent flag for an entry that has committed xid
1287 * in the list as that ensures that the subscriber would have the
1288 * corresponding schema and we don't need to send it unless there is
1289 * any invalidation for that relation.
1291 foreach(lc, entry->streamed_txns)
1293 if (xid == (uint32) lfirst_int(lc))
1295 if (is_commit)
1296 entry->schema_sent = true;
1298 entry->streamed_txns =
1299 foreach_delete_current(entry->streamed_txns, lc);
1300 break;
1307 * Relcache invalidation callback
1309 static void
1310 rel_sync_cache_relation_cb(Datum arg, Oid relid)
1312 RelationSyncEntry *entry;
1315 * We can get here if the plugin was used in SQL interface as the
1316 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1317 * is no way to unregister the relcache invalidation callback.
1319 if (RelationSyncCache == NULL)
1320 return;
1323 * Nobody keeps pointers to entries in this hash table around outside
1324 * logical decoding callback calls - but invalidation events can come in
1325 * *during* a callback if we access the relcache in the callback. Because
1326 * of that we must mark the cache entry as invalid but not remove it from
1327 * the hash while it could still be referenced, then prune it at a later
1328 * safe point.
1330 * Getting invalidations for relations that aren't in the table is
1331 * entirely normal, since there's no way to unregister for an invalidation
1332 * event. So we don't care if it's found or not.
1334 entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
1335 HASH_FIND, NULL);
1338 * Reset schema sent status as the relation definition may have changed.
1339 * Also free any objects that depended on the earlier definition.
1341 if (entry != NULL)
1343 entry->schema_sent = false;
1344 list_free(entry->streamed_txns);
1345 entry->streamed_txns = NIL;
1346 if (entry->map)
1349 * Must free the TupleDescs contained in the map explicitly,
1350 * because free_conversion_map() doesn't.
1352 FreeTupleDesc(entry->map->indesc);
1353 FreeTupleDesc(entry->map->outdesc);
1354 free_conversion_map(entry->map);
1356 entry->map = NULL;
1361 * Publication relation/schema map syscache invalidation callback
1363 static void
1364 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
1366 HASH_SEQ_STATUS status;
1367 RelationSyncEntry *entry;
1370 * We can get here if the plugin was used in SQL interface as the
1371 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1372 * is no way to unregister the relcache invalidation callback.
1374 if (RelationSyncCache == NULL)
1375 return;
1378 * There is no way to find which entry in our cache the hash belongs to so
1379 * mark the whole cache as invalid.
1381 hash_seq_init(&status, RelationSyncCache);
1382 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
1384 entry->replicate_valid = false;
1387 * There might be some relations dropped from the publication so we
1388 * don't need to publish the changes for them.
1390 entry->pubactions.pubinsert = false;
1391 entry->pubactions.pubupdate = false;
1392 entry->pubactions.pubdelete = false;
1393 entry->pubactions.pubtruncate = false;
1397 /* Send Replication origin */
1398 static void
1399 send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
1400 XLogRecPtr origin_lsn, bool send_origin)
1402 if (send_origin)
1404 char *origin;
1406 /*----------
1407 * XXX: which behaviour do we want here?
1409 * Alternatives:
1410 * - don't send origin message if origin name not found
1411 * (that's what we do now)
1412 * - throw error - that will break replication, not good
1413 * - send some special "unknown" origin
1414 *----------
1416 if (replorigin_by_oid(origin_id, true, &origin))
1418 /* Message boundary */
1419 OutputPluginWrite(ctx, false);
1420 OutputPluginPrepareWrite(ctx, true);
1422 logicalrep_write_origin(ctx->out, origin, origin_lsn);