Disable autovacuum for tables in stats import tests.
[pgsql.git] / src / backend / replication / pgoutput / pgoutput.c
blob00e7024563e6c8b439c25cb2ead62a07b41dcf78
1 /*-------------------------------------------------------------------------
3 * pgoutput.c
4 * Logical Replication output plugin
6 * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8 * IDENTIFICATION
9 * src/backend/replication/pgoutput/pgoutput.c
11 *-------------------------------------------------------------------------
13 #include "postgres.h"
15 #include "access/tupconvert.h"
16 #include "catalog/partition.h"
17 #include "catalog/pg_publication.h"
18 #include "catalog/pg_publication_rel.h"
19 #include "catalog/pg_subscription.h"
20 #include "commands/defrem.h"
21 #include "commands/subscriptioncmds.h"
22 #include "executor/executor.h"
23 #include "fmgr.h"
24 #include "nodes/makefuncs.h"
25 #include "parser/parse_relation.h"
26 #include "replication/logical.h"
27 #include "replication/logicalproto.h"
28 #include "replication/origin.h"
29 #include "replication/pgoutput.h"
30 #include "utils/builtins.h"
31 #include "utils/inval.h"
32 #include "utils/lsyscache.h"
33 #include "utils/memutils.h"
34 #include "utils/rel.h"
35 #include "utils/syscache.h"
36 #include "utils/varlena.h"
38 PG_MODULE_MAGIC;
40 static void pgoutput_startup(LogicalDecodingContext *ctx,
41 OutputPluginOptions *opt, bool is_init);
42 static void pgoutput_shutdown(LogicalDecodingContext *ctx);
43 static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
44 ReorderBufferTXN *txn);
45 static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
46 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
47 static void pgoutput_change(LogicalDecodingContext *ctx,
48 ReorderBufferTXN *txn, Relation relation,
49 ReorderBufferChange *change);
50 static void pgoutput_truncate(LogicalDecodingContext *ctx,
51 ReorderBufferTXN *txn, int nrelations, Relation relations[],
52 ReorderBufferChange *change);
53 static void pgoutput_message(LogicalDecodingContext *ctx,
54 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
55 bool transactional, const char *prefix,
56 Size sz, const char *message);
57 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
58 RepOriginId origin_id);
59 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
60 ReorderBufferTXN *txn);
61 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
62 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
63 static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
64 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
65 static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
66 ReorderBufferTXN *txn,
67 XLogRecPtr prepare_end_lsn,
68 TimestampTz prepare_time);
69 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
70 ReorderBufferTXN *txn);
71 static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
72 ReorderBufferTXN *txn);
73 static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
74 ReorderBufferTXN *txn,
75 XLogRecPtr abort_lsn);
76 static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
77 ReorderBufferTXN *txn,
78 XLogRecPtr commit_lsn);
79 static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
80 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
82 static bool publications_valid;
84 static List *LoadPublications(List *pubnames);
85 static void publication_invalidation_cb(Datum arg, int cacheid,
86 uint32 hashvalue);
87 static void send_relation_and_attrs(Relation relation, TransactionId xid,
88 LogicalDecodingContext *ctx,
89 Bitmapset *columns);
90 static void send_repl_origin(LogicalDecodingContext *ctx,
91 RepOriginId origin_id, XLogRecPtr origin_lsn,
92 bool send_origin);
95 * Only 3 publication actions are used for row filtering ("insert", "update",
96 * "delete"). See RelationSyncEntry.exprstate[].
98 enum RowFilterPubAction
100 PUBACTION_INSERT,
101 PUBACTION_UPDATE,
102 PUBACTION_DELETE,
105 #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
108 * Entry in the map used to remember which relation schemas we sent.
110 * The schema_sent flag determines if the current schema record for the
111 * relation (and for its ancestor if publish_as_relid is set) was already
112 * sent to the subscriber (in which case we don't need to send it again).
114 * The schema cache on downstream is however updated only at commit time,
115 * and with streamed transactions the commit order may be different from
116 * the order the transactions are sent in. Also, the (sub) transactions
117 * might get aborted so we need to send the schema for each (sub) transaction
118 * so that we don't lose the schema information on abort. For handling this,
119 * we maintain the list of xids (streamed_txns) for those we have already sent
120 * the schema.
122 * For partitions, 'pubactions' considers not only the table's own
123 * publications, but also those of all of its ancestors.
125 typedef struct RelationSyncEntry
127 Oid relid; /* relation oid */
129 bool replicate_valid; /* overall validity flag for entry */
131 bool schema_sent;
132 List *streamed_txns; /* streamed toplevel transactions with this
133 * schema */
135 /* are we publishing this rel? */
136 PublicationActions pubactions;
139 * ExprState array for row filter. Different publication actions don't
140 * allow multiple expressions to always be combined into one, because
141 * updates or deletes restrict the column in expression to be part of the
142 * replica identity index whereas inserts do not have this restriction, so
143 * there is one ExprState per publication action.
145 ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
146 EState *estate; /* executor state used for row filter */
147 TupleTableSlot *new_slot; /* slot for storing new tuple */
148 TupleTableSlot *old_slot; /* slot for storing old tuple */
151 * OID of the relation to publish changes as. For a partition, this may
152 * be set to one of its ancestors whose schema will be used when
153 * replicating changes, if publish_via_partition_root is set for the
154 * publication.
156 Oid publish_as_relid;
159 * Map used when replicating using an ancestor's schema to convert tuples
160 * from partition's type to the ancestor's; NULL if publish_as_relid is
161 * same as 'relid' or if unnecessary due to partition and the ancestor
162 * having identical TupleDesc.
164 AttrMap *attrmap;
167 * Columns included in the publication, or NULL if all columns are
168 * included implicitly. Note that the attnums in this bitmap are not
169 * shifted by FirstLowInvalidHeapAttributeNumber.
171 Bitmapset *columns;
174 * Private context to store additional data for this entry - state for the
175 * row filter expressions, column list, etc.
177 MemoryContext entry_cxt;
178 } RelationSyncEntry;
181 * Maintain a per-transaction level variable to track whether the transaction
182 * has sent BEGIN. BEGIN is only sent when the first change in a transaction
183 * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
184 * messages for empty transactions which saves network bandwidth.
186 * This optimization is not used for prepared transactions because if the
187 * WALSender restarts after prepare of a transaction and before commit prepared
188 * of the same transaction then we won't be able to figure out if we have
189 * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
190 * because we would have lost the in-memory txndata information that was
191 * present prior to the restart. This will result in sending a spurious
192 * COMMIT PREPARED without a corresponding prepared transaction at the
193 * downstream which would lead to an error when it tries to process it.
195 * XXX We could achieve this optimization by changing protocol to send
196 * additional information so that downstream can detect that the corresponding
197 * prepare has not been sent. However, adding such a check for every
198 * transaction in the downstream could be costly so we might want to do it
199 * optionally.
201 * We also don't have this optimization for streamed transactions because
202 * they can contain prepared transactions.
204 typedef struct PGOutputTxnData
206 bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
207 } PGOutputTxnData;
209 /* Map used to remember which relation schemas we sent. */
210 static HTAB *RelationSyncCache = NULL;
212 static void init_rel_sync_cache(MemoryContext cachectx);
213 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
214 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
215 Relation relation);
216 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
217 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
218 uint32 hashvalue);
219 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
220 TransactionId xid);
221 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
222 TransactionId xid);
223 static void init_tuple_slot(PGOutputData *data, Relation relation,
224 RelationSyncEntry *entry);
226 /* row filter routines */
227 static EState *create_estate_for_relation(Relation rel);
228 static void pgoutput_row_filter_init(PGOutputData *data,
229 List *publications,
230 RelationSyncEntry *entry);
231 static bool pgoutput_row_filter_exec_expr(ExprState *state,
232 ExprContext *econtext);
233 static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
234 TupleTableSlot **new_slot_ptr,
235 RelationSyncEntry *entry,
236 ReorderBufferChangeType *action);
238 /* column list routines */
239 static void pgoutput_column_list_init(PGOutputData *data,
240 List *publications,
241 RelationSyncEntry *entry);
244 * Specify output plugin callbacks
246 void
247 _PG_output_plugin_init(OutputPluginCallbacks *cb)
249 cb->startup_cb = pgoutput_startup;
250 cb->begin_cb = pgoutput_begin_txn;
251 cb->change_cb = pgoutput_change;
252 cb->truncate_cb = pgoutput_truncate;
253 cb->message_cb = pgoutput_message;
254 cb->commit_cb = pgoutput_commit_txn;
256 cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
257 cb->prepare_cb = pgoutput_prepare_txn;
258 cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
259 cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
260 cb->filter_by_origin_cb = pgoutput_origin_filter;
261 cb->shutdown_cb = pgoutput_shutdown;
263 /* transaction streaming */
264 cb->stream_start_cb = pgoutput_stream_start;
265 cb->stream_stop_cb = pgoutput_stream_stop;
266 cb->stream_abort_cb = pgoutput_stream_abort;
267 cb->stream_commit_cb = pgoutput_stream_commit;
268 cb->stream_change_cb = pgoutput_change;
269 cb->stream_message_cb = pgoutput_message;
270 cb->stream_truncate_cb = pgoutput_truncate;
271 /* transaction streaming - two-phase commit */
272 cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
275 static void
276 parse_output_parameters(List *options, PGOutputData *data)
278 ListCell *lc;
279 bool protocol_version_given = false;
280 bool publication_names_given = false;
281 bool binary_option_given = false;
282 bool messages_option_given = false;
283 bool streaming_given = false;
284 bool two_phase_option_given = false;
285 bool origin_option_given = false;
287 data->binary = false;
288 data->streaming = LOGICALREP_STREAM_OFF;
289 data->messages = false;
290 data->two_phase = false;
292 foreach(lc, options)
294 DefElem *defel = (DefElem *) lfirst(lc);
296 Assert(defel->arg == NULL || IsA(defel->arg, String));
298 /* Check each param, whether or not we recognize it */
299 if (strcmp(defel->defname, "proto_version") == 0)
301 unsigned long parsed;
302 char *endptr;
304 if (protocol_version_given)
305 ereport(ERROR,
306 (errcode(ERRCODE_SYNTAX_ERROR),
307 errmsg("conflicting or redundant options")));
308 protocol_version_given = true;
310 errno = 0;
311 parsed = strtoul(strVal(defel->arg), &endptr, 10);
312 if (errno != 0 || *endptr != '\0')
313 ereport(ERROR,
314 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
315 errmsg("invalid proto_version")));
317 if (parsed > PG_UINT32_MAX)
318 ereport(ERROR,
319 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
320 errmsg("proto_version \"%s\" out of range",
321 strVal(defel->arg))));
323 data->protocol_version = (uint32) parsed;
325 else if (strcmp(defel->defname, "publication_names") == 0)
327 if (publication_names_given)
328 ereport(ERROR,
329 (errcode(ERRCODE_SYNTAX_ERROR),
330 errmsg("conflicting or redundant options")));
331 publication_names_given = true;
333 if (!SplitIdentifierString(strVal(defel->arg), ',',
334 &data->publication_names))
335 ereport(ERROR,
336 (errcode(ERRCODE_INVALID_NAME),
337 errmsg("invalid publication_names syntax")));
339 else if (strcmp(defel->defname, "binary") == 0)
341 if (binary_option_given)
342 ereport(ERROR,
343 (errcode(ERRCODE_SYNTAX_ERROR),
344 errmsg("conflicting or redundant options")));
345 binary_option_given = true;
347 data->binary = defGetBoolean(defel);
349 else if (strcmp(defel->defname, "messages") == 0)
351 if (messages_option_given)
352 ereport(ERROR,
353 (errcode(ERRCODE_SYNTAX_ERROR),
354 errmsg("conflicting or redundant options")));
355 messages_option_given = true;
357 data->messages = defGetBoolean(defel);
359 else if (strcmp(defel->defname, "streaming") == 0)
361 if (streaming_given)
362 ereport(ERROR,
363 (errcode(ERRCODE_SYNTAX_ERROR),
364 errmsg("conflicting or redundant options")));
365 streaming_given = true;
367 data->streaming = defGetStreamingMode(defel);
369 else if (strcmp(defel->defname, "two_phase") == 0)
371 if (two_phase_option_given)
372 ereport(ERROR,
373 (errcode(ERRCODE_SYNTAX_ERROR),
374 errmsg("conflicting or redundant options")));
375 two_phase_option_given = true;
377 data->two_phase = defGetBoolean(defel);
379 else if (strcmp(defel->defname, "origin") == 0)
381 char *origin;
383 if (origin_option_given)
384 ereport(ERROR,
385 errcode(ERRCODE_SYNTAX_ERROR),
386 errmsg("conflicting or redundant options"));
387 origin_option_given = true;
389 origin = defGetString(defel);
390 if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
391 data->publish_no_origin = true;
392 else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
393 data->publish_no_origin = false;
394 else
395 ereport(ERROR,
396 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
397 errmsg("unrecognized origin value: \"%s\"", origin));
399 else
400 elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
403 /* Check required options */
404 if (!protocol_version_given)
405 ereport(ERROR,
406 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
407 errmsg("option \"%s\" missing", "proto_version"));
408 if (!publication_names_given)
409 ereport(ERROR,
410 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
411 errmsg("option \"%s\" missing", "publication_names"));
415 * Initialize this plugin
417 static void
418 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
419 bool is_init)
421 PGOutputData *data = palloc0(sizeof(PGOutputData));
422 static bool publication_callback_registered = false;
424 /* Create our memory context for private allocations. */
425 data->context = AllocSetContextCreate(ctx->context,
426 "logical replication output context",
427 ALLOCSET_DEFAULT_SIZES);
429 data->cachectx = AllocSetContextCreate(ctx->context,
430 "logical replication cache context",
431 ALLOCSET_DEFAULT_SIZES);
433 ctx->output_plugin_private = data;
435 /* This plugin uses binary protocol. */
436 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
439 * This is replication start and not slot initialization.
441 * Parse and validate options passed by the client.
443 if (!is_init)
445 /* Parse the params and ERROR if we see any we don't recognize */
446 parse_output_parameters(ctx->output_plugin_options, data);
448 /* Check if we support requested protocol */
449 if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
450 ereport(ERROR,
451 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
452 errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
453 data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
455 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
456 ereport(ERROR,
457 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
458 errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
459 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
462 * Decide whether to enable streaming. It is disabled by default, in
463 * which case we just update the flag in decoding context. Otherwise
464 * we only allow it with sufficient version of the protocol, and when
465 * the output plugin supports it.
467 if (data->streaming == LOGICALREP_STREAM_OFF)
468 ctx->streaming = false;
469 else if (data->streaming == LOGICALREP_STREAM_ON &&
470 data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
471 ereport(ERROR,
472 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
473 errmsg("requested proto_version=%d does not support streaming, need %d or higher",
474 data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
475 else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
476 data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
477 ereport(ERROR,
478 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
479 errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
480 data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
481 else if (!ctx->streaming)
482 ereport(ERROR,
483 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
484 errmsg("streaming requested, but not supported by output plugin")));
487 * Here, we just check whether the two-phase option is passed by
488 * plugin and decide whether to enable it at later point of time. It
489 * remains enabled if the previous start-up has done so. But we only
490 * allow the option to be passed in with sufficient version of the
491 * protocol, and when the output plugin supports it.
493 if (!data->two_phase)
494 ctx->twophase_opt_given = false;
495 else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
496 ereport(ERROR,
497 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498 errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
499 data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
500 else if (!ctx->twophase)
501 ereport(ERROR,
502 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
503 errmsg("two-phase commit requested, but not supported by output plugin")));
504 else
505 ctx->twophase_opt_given = true;
507 /* Init publication state. */
508 data->publications = NIL;
509 publications_valid = false;
512 * Register callback for pg_publication if we didn't already do that
513 * during some previous call in this process.
515 if (!publication_callback_registered)
517 CacheRegisterSyscacheCallback(PUBLICATIONOID,
518 publication_invalidation_cb,
519 (Datum) 0);
520 publication_callback_registered = true;
523 /* Initialize relation schema cache. */
524 init_rel_sync_cache(CacheMemoryContext);
526 else
529 * Disable the streaming and prepared transactions during the slot
530 * initialization mode.
532 ctx->streaming = false;
533 ctx->twophase = false;
538 * BEGIN callback.
540 * Don't send the BEGIN message here instead postpone it until the first
541 * change. In logical replication, a common scenario is to replicate a set of
542 * tables (instead of all tables) and transactions whose changes were on
543 * the table(s) that are not published will produce empty transactions. These
544 * empty transactions will send BEGIN and COMMIT messages to subscribers,
545 * using bandwidth on something with little/no use for logical replication.
547 static void
548 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
550 PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
551 sizeof(PGOutputTxnData));
553 txn->output_plugin_private = txndata;
557 * Send BEGIN.
559 * This is called while processing the first change of the transaction.
561 static void
562 pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
564 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
565 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
567 Assert(txndata);
568 Assert(!txndata->sent_begin_txn);
570 OutputPluginPrepareWrite(ctx, !send_replication_origin);
571 logicalrep_write_begin(ctx->out, txn);
572 txndata->sent_begin_txn = true;
574 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
575 send_replication_origin);
577 OutputPluginWrite(ctx, true);
581 * COMMIT callback
583 static void
584 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
585 XLogRecPtr commit_lsn)
587 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
588 bool sent_begin_txn;
590 Assert(txndata);
593 * We don't need to send the commit message unless some relevant change
594 * from this transaction has been sent to the downstream.
596 sent_begin_txn = txndata->sent_begin_txn;
597 OutputPluginUpdateProgress(ctx, !sent_begin_txn);
598 pfree(txndata);
599 txn->output_plugin_private = NULL;
601 if (!sent_begin_txn)
603 elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
604 return;
607 OutputPluginPrepareWrite(ctx, true);
608 logicalrep_write_commit(ctx->out, txn, commit_lsn);
609 OutputPluginWrite(ctx, true);
613 * BEGIN PREPARE callback
615 static void
616 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
618 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
620 OutputPluginPrepareWrite(ctx, !send_replication_origin);
621 logicalrep_write_begin_prepare(ctx->out, txn);
623 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
624 send_replication_origin);
626 OutputPluginWrite(ctx, true);
630 * PREPARE callback
632 static void
633 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
634 XLogRecPtr prepare_lsn)
636 OutputPluginUpdateProgress(ctx, false);
638 OutputPluginPrepareWrite(ctx, true);
639 logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
640 OutputPluginWrite(ctx, true);
644 * COMMIT PREPARED callback
646 static void
647 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
648 XLogRecPtr commit_lsn)
650 OutputPluginUpdateProgress(ctx, false);
652 OutputPluginPrepareWrite(ctx, true);
653 logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
654 OutputPluginWrite(ctx, true);
658 * ROLLBACK PREPARED callback
660 static void
661 pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
662 ReorderBufferTXN *txn,
663 XLogRecPtr prepare_end_lsn,
664 TimestampTz prepare_time)
666 OutputPluginUpdateProgress(ctx, false);
668 OutputPluginPrepareWrite(ctx, true);
669 logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
670 prepare_time);
671 OutputPluginWrite(ctx, true);
675 * Write the current schema of the relation and its ancestor (if any) if not
676 * done yet.
678 static void
679 maybe_send_schema(LogicalDecodingContext *ctx,
680 ReorderBufferChange *change,
681 Relation relation, RelationSyncEntry *relentry)
683 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
684 bool schema_sent;
685 TransactionId xid = InvalidTransactionId;
686 TransactionId topxid = InvalidTransactionId;
689 * Remember XID of the (sub)transaction for the change. We don't care if
690 * it's top-level transaction or not (we have already sent that XID in
691 * start of the current streaming block).
693 * If we're not in a streaming block, just use InvalidTransactionId and
694 * the write methods will not include it.
696 if (data->in_streaming)
697 xid = change->txn->xid;
699 if (rbtxn_is_subtxn(change->txn))
700 topxid = rbtxn_get_toptxn(change->txn)->xid;
701 else
702 topxid = xid;
705 * Do we need to send the schema? We do track streamed transactions
706 * separately, because those may be applied later (and the regular
707 * transactions won't see their effects until then) and in an order that
708 * we don't know at this point.
710 * XXX There is a scope of optimization here. Currently, we always send
711 * the schema first time in a streaming transaction but we can probably
712 * avoid that by checking 'relentry->schema_sent' flag. However, before
713 * doing that we need to study its impact on the case where we have a mix
714 * of streaming and non-streaming transactions.
716 if (data->in_streaming)
717 schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
718 else
719 schema_sent = relentry->schema_sent;
721 /* Nothing to do if we already sent the schema. */
722 if (schema_sent)
723 return;
726 * Send the schema. If the changes will be published using an ancestor's
727 * schema, not the relation's own, send that ancestor's schema before
728 * sending relation's own (XXX - maybe sending only the former suffices?).
730 if (relentry->publish_as_relid != RelationGetRelid(relation))
732 Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
734 send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
735 RelationClose(ancestor);
738 send_relation_and_attrs(relation, xid, ctx, relentry->columns);
740 if (data->in_streaming)
741 set_schema_sent_in_streamed_txn(relentry, topxid);
742 else
743 relentry->schema_sent = true;
747 * Sends a relation
749 static void
750 send_relation_and_attrs(Relation relation, TransactionId xid,
751 LogicalDecodingContext *ctx,
752 Bitmapset *columns)
754 TupleDesc desc = RelationGetDescr(relation);
755 int i;
758 * Write out type info if needed. We do that only for user-created types.
759 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
760 * objects with hand-assigned OIDs to be "built in", not for instance any
761 * function or type defined in the information_schema. This is important
762 * because only hand-assigned OIDs can be expected to remain stable across
763 * major versions.
765 for (i = 0; i < desc->natts; i++)
767 Form_pg_attribute att = TupleDescAttr(desc, i);
769 if (att->attisdropped || att->attgenerated)
770 continue;
772 if (att->atttypid < FirstGenbkiObjectId)
773 continue;
775 /* Skip this attribute if it's not present in the column list */
776 if (columns != NULL && !bms_is_member(att->attnum, columns))
777 continue;
779 OutputPluginPrepareWrite(ctx, false);
780 logicalrep_write_typ(ctx->out, xid, att->atttypid);
781 OutputPluginWrite(ctx, false);
784 OutputPluginPrepareWrite(ctx, false);
785 logicalrep_write_rel(ctx->out, xid, relation, columns);
786 OutputPluginWrite(ctx, false);
790 * Executor state preparation for evaluation of row filter expressions for the
791 * specified relation.
793 static EState *
794 create_estate_for_relation(Relation rel)
796 EState *estate;
797 RangeTblEntry *rte;
798 List *perminfos = NIL;
800 estate = CreateExecutorState();
802 rte = makeNode(RangeTblEntry);
803 rte->rtekind = RTE_RELATION;
804 rte->relid = RelationGetRelid(rel);
805 rte->relkind = rel->rd_rel->relkind;
806 rte->rellockmode = AccessShareLock;
808 addRTEPermissionInfo(&perminfos, rte);
810 ExecInitRangeTable(estate, list_make1(rte), perminfos);
812 estate->es_output_cid = GetCurrentCommandId(false);
814 return estate;
818 * Evaluates row filter.
820 * If the row filter evaluates to NULL, it is taken as false i.e. the change
821 * isn't replicated.
823 static bool
824 pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
826 Datum ret;
827 bool isnull;
829 Assert(state != NULL);
831 ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
833 elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
834 isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
835 isnull ? "true" : "false");
837 if (isnull)
838 return false;
840 return DatumGetBool(ret);
844 * Make sure the per-entry memory context exists.
846 static void
847 pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
849 Relation relation;
851 /* The context may already exist, in which case bail out. */
852 if (entry->entry_cxt)
853 return;
855 relation = RelationIdGetRelation(entry->publish_as_relid);
857 entry->entry_cxt = AllocSetContextCreate(data->cachectx,
858 "entry private context",
859 ALLOCSET_SMALL_SIZES);
861 MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
862 RelationGetRelationName(relation));
866 * Initialize the row filter.
868 static void
869 pgoutput_row_filter_init(PGOutputData *data, List *publications,
870 RelationSyncEntry *entry)
872 ListCell *lc;
873 List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
874 bool no_filter[] = {false, false, false}; /* One per pubaction */
875 MemoryContext oldctx;
876 int idx;
877 bool has_filter = true;
878 Oid schemaid = get_rel_namespace(entry->publish_as_relid);
881 * Find if there are any row filters for this relation. If there are, then
882 * prepare the necessary ExprState and cache it in entry->exprstate. To
883 * build an expression state, we need to ensure the following:
885 * All the given publication-table mappings must be checked.
887 * Multiple publications might have multiple row filters for this
888 * relation. Since row filter usage depends on the DML operation, there
889 * are multiple lists (one for each operation) to which row filters will
890 * be appended.
892 * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
893 * expression" so it takes precedence.
895 foreach(lc, publications)
897 Publication *pub = lfirst(lc);
898 HeapTuple rftuple = NULL;
899 Datum rfdatum = 0;
900 bool pub_no_filter = true;
903 * If the publication is FOR ALL TABLES, or the publication includes a
904 * FOR TABLES IN SCHEMA where the table belongs to the referred
905 * schema, then it is treated the same as if there are no row filters
906 * (even if other publications have a row filter).
908 if (!pub->alltables &&
909 !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
910 ObjectIdGetDatum(schemaid),
911 ObjectIdGetDatum(pub->oid)))
914 * Check for the presence of a row filter in this publication.
916 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
917 ObjectIdGetDatum(entry->publish_as_relid),
918 ObjectIdGetDatum(pub->oid));
920 if (HeapTupleIsValid(rftuple))
922 /* Null indicates no filter. */
923 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
924 Anum_pg_publication_rel_prqual,
925 &pub_no_filter);
929 if (pub_no_filter)
931 if (rftuple)
932 ReleaseSysCache(rftuple);
934 no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
935 no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
936 no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
939 * Quick exit if all the DML actions are publicized via this
940 * publication.
942 if (no_filter[PUBACTION_INSERT] &&
943 no_filter[PUBACTION_UPDATE] &&
944 no_filter[PUBACTION_DELETE])
946 has_filter = false;
947 break;
950 /* No additional work for this publication. Next one. */
951 continue;
954 /* Form the per pubaction row filter lists. */
955 if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
956 rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
957 TextDatumGetCString(rfdatum));
958 if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
959 rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
960 TextDatumGetCString(rfdatum));
961 if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
962 rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
963 TextDatumGetCString(rfdatum));
965 ReleaseSysCache(rftuple);
966 } /* loop all subscribed publications */
968 /* Clean the row filter */
969 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
971 if (no_filter[idx])
973 list_free_deep(rfnodes[idx]);
974 rfnodes[idx] = NIL;
978 if (has_filter)
980 Relation relation = RelationIdGetRelation(entry->publish_as_relid);
982 pgoutput_ensure_entry_cxt(data, entry);
985 * Now all the filters for all pubactions are known. Combine them when
986 * their pubactions are the same.
988 oldctx = MemoryContextSwitchTo(entry->entry_cxt);
989 entry->estate = create_estate_for_relation(relation);
990 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
992 List *filters = NIL;
993 Expr *rfnode;
995 if (rfnodes[idx] == NIL)
996 continue;
998 foreach(lc, rfnodes[idx])
999 filters = lappend(filters, stringToNode((char *) lfirst(lc)));
1001 /* combine the row filter and cache the ExprState */
1002 rfnode = make_orclause(filters);
1003 entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1004 } /* for each pubaction */
1005 MemoryContextSwitchTo(oldctx);
1007 RelationClose(relation);
1012 * Initialize the column list.
1014 static void
1015 pgoutput_column_list_init(PGOutputData *data, List *publications,
1016 RelationSyncEntry *entry)
1018 ListCell *lc;
1019 bool first = true;
1020 Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1023 * Find if there are any column lists for this relation. If there are,
1024 * build a bitmap using the column lists.
1026 * Multiple publications might have multiple column lists for this
1027 * relation.
1029 * Note that we don't support the case where the column list is different
1030 * for the same table when combining publications. See comments atop
1031 * fetch_table_list. But one can later change the publication so we still
1032 * need to check all the given publication-table mappings and report an
1033 * error if any publications have a different column list.
1035 * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
1037 foreach(lc, publications)
1039 Publication *pub = lfirst(lc);
1040 HeapTuple cftuple = NULL;
1041 Datum cfdatum = 0;
1042 Bitmapset *cols = NULL;
1045 * If the publication is FOR ALL TABLES then it is treated the same as
1046 * if there are no column lists (even if other publications have a
1047 * list).
1049 if (!pub->alltables)
1051 bool pub_no_list = true;
1054 * Check for the presence of a column list in this publication.
1056 * Note: If we find no pg_publication_rel row, it's a publication
1057 * defined for a whole schema, so it can't have a column list,
1058 * just like a FOR ALL TABLES publication.
1060 cftuple = SearchSysCache2(PUBLICATIONRELMAP,
1061 ObjectIdGetDatum(entry->publish_as_relid),
1062 ObjectIdGetDatum(pub->oid));
1064 if (HeapTupleIsValid(cftuple))
1066 /* Lookup the column list attribute. */
1067 cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
1068 Anum_pg_publication_rel_prattrs,
1069 &pub_no_list);
1071 /* Build the column list bitmap in the per-entry context. */
1072 if (!pub_no_list) /* when not null */
1074 int i;
1075 int nliveatts = 0;
1076 TupleDesc desc = RelationGetDescr(relation);
1078 pgoutput_ensure_entry_cxt(data, entry);
1080 cols = pub_collist_to_bitmapset(cols, cfdatum,
1081 entry->entry_cxt);
1083 /* Get the number of live attributes. */
1084 for (i = 0; i < desc->natts; i++)
1086 Form_pg_attribute att = TupleDescAttr(desc, i);
1088 if (att->attisdropped || att->attgenerated)
1089 continue;
1091 nliveatts++;
1095 * If column list includes all the columns of the table,
1096 * set it to NULL.
1098 if (bms_num_members(cols) == nliveatts)
1100 bms_free(cols);
1101 cols = NULL;
1105 ReleaseSysCache(cftuple);
1109 if (first)
1111 entry->columns = cols;
1112 first = false;
1114 else if (!bms_equal(entry->columns, cols))
1115 ereport(ERROR,
1116 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1117 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1118 get_namespace_name(RelationGetNamespace(relation)),
1119 RelationGetRelationName(relation)));
1120 } /* loop all subscribed publications */
1122 RelationClose(relation);
1126 * Initialize the slot for storing new and old tuples, and build the map that
1127 * will be used to convert the relation's tuples into the ancestor's format.
1129 static void
1130 init_tuple_slot(PGOutputData *data, Relation relation,
1131 RelationSyncEntry *entry)
1133 MemoryContext oldctx;
1134 TupleDesc oldtupdesc;
1135 TupleDesc newtupdesc;
1137 oldctx = MemoryContextSwitchTo(data->cachectx);
1140 * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1141 * live as long as the cache remains.
1143 oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1144 newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1146 entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1147 entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1149 MemoryContextSwitchTo(oldctx);
1152 * Cache the map that will be used to convert the relation's tuples into
1153 * the ancestor's format, if needed.
1155 if (entry->publish_as_relid != RelationGetRelid(relation))
1157 Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1158 TupleDesc indesc = RelationGetDescr(relation);
1159 TupleDesc outdesc = RelationGetDescr(ancestor);
1161 /* Map must live as long as the session does. */
1162 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1164 entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1166 MemoryContextSwitchTo(oldctx);
1167 RelationClose(ancestor);
1172 * Change is checked against the row filter if any.
1174 * Returns true if the change is to be replicated, else false.
1176 * For inserts, evaluate the row filter for new tuple.
1177 * For deletes, evaluate the row filter for old tuple.
1178 * For updates, evaluate the row filter for old and new tuple.
1180 * For updates, if both evaluations are true, we allow sending the UPDATE and
1181 * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1182 * only one of the tuples matches the row filter expression, we transform
1183 * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1184 * following rules:
1186 * Case 1: old-row (no match) new-row (no match) -> (drop change)
1187 * Case 2: old-row (no match) new row (match) -> INSERT
1188 * Case 3: old-row (match) new-row (no match) -> DELETE
1189 * Case 4: old-row (match) new row (match) -> UPDATE
1191 * The new action is updated in the action parameter.
1193 * The new slot could be updated when transforming the UPDATE into INSERT,
1194 * because the original new tuple might not have column values from the replica
1195 * identity.
1197 * Examples:
1198 * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1199 * Since the old tuple satisfies, the initial table synchronization copied this
1200 * row (or another method was used to guarantee that there is data
1201 * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1202 * row filter, so from a data consistency perspective, that row should be
1203 * removed on the subscriber. The UPDATE should be transformed into a DELETE
1204 * statement and be sent to the subscriber. Keeping this row on the subscriber
1205 * is undesirable because it doesn't reflect what was defined in the row filter
1206 * expression on the publisher. This row on the subscriber would likely not be
1207 * modified by replication again. If someone inserted a new row with the same
1208 * old identifier, replication could stop due to a constraint violation.
1210 * Let's say the old tuple doesn't match the row filter but the new tuple does.
1211 * Since the old tuple doesn't satisfy, the initial table synchronization
1212 * probably didn't copy this row. However, after the UPDATE the new tuple does
1213 * satisfy the row filter, so from a data consistency perspective, that row
1214 * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1215 * statements have no effect (it matches no row -- see
1216 * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1217 * INSERT statement and be sent to the subscriber. However, this might surprise
1218 * someone who expects the data set to satisfy the row filter expression on the
1219 * provider.
1221 static bool
1222 pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1223 TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1224 ReorderBufferChangeType *action)
1226 TupleDesc desc;
1227 int i;
1228 bool old_matched,
1229 new_matched,
1230 result;
1231 TupleTableSlot *tmp_new_slot;
1232 TupleTableSlot *new_slot = *new_slot_ptr;
1233 ExprContext *ecxt;
1234 ExprState *filter_exprstate;
1237 * We need this map to avoid relying on ReorderBufferChangeType enums
1238 * having specific values.
1240 static const int map_changetype_pubaction[] = {
1241 [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1242 [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1243 [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1246 Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1247 *action == REORDER_BUFFER_CHANGE_UPDATE ||
1248 *action == REORDER_BUFFER_CHANGE_DELETE);
1250 Assert(new_slot || old_slot);
1252 /* Get the corresponding row filter */
1253 filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1255 /* Bail out if there is no row filter */
1256 if (!filter_exprstate)
1257 return true;
1259 elog(DEBUG3, "table \"%s.%s\" has row filter",
1260 get_namespace_name(RelationGetNamespace(relation)),
1261 RelationGetRelationName(relation));
1263 ResetPerTupleExprContext(entry->estate);
1265 ecxt = GetPerTupleExprContext(entry->estate);
1268 * For the following occasions where there is only one tuple, we can
1269 * evaluate the row filter for that tuple and return.
1271 * For inserts, we only have the new tuple.
1273 * For updates, we can have only a new tuple when none of the replica
1274 * identity columns changed and none of those columns have external data
1275 * but we still need to evaluate the row filter for the new tuple as the
1276 * existing values of those columns might not match the filter. Also,
1277 * users can use constant expressions in the row filter, so we anyway need
1278 * to evaluate it for the new tuple.
1280 * For deletes, we only have the old tuple.
1282 if (!new_slot || !old_slot)
1284 ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1285 result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1287 return result;
1291 * Both the old and new tuples must be valid only for updates and need to
1292 * be checked against the row filter.
1294 Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1296 slot_getallattrs(new_slot);
1297 slot_getallattrs(old_slot);
1299 tmp_new_slot = NULL;
1300 desc = RelationGetDescr(relation);
1303 * The new tuple might not have all the replica identity columns, in which
1304 * case it needs to be copied over from the old tuple.
1306 for (i = 0; i < desc->natts; i++)
1308 Form_pg_attribute att = TupleDescAttr(desc, i);
1311 * if the column in the new tuple or old tuple is null, nothing to do
1313 if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1314 continue;
1317 * Unchanged toasted replica identity columns are only logged in the
1318 * old tuple. Copy this over to the new tuple. The changed (or WAL
1319 * Logged) toast values are always assembled in memory and set as
1320 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1322 if (att->attlen == -1 &&
1323 VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1324 !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1326 if (!tmp_new_slot)
1328 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1329 ExecClearTuple(tmp_new_slot);
1331 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1332 desc->natts * sizeof(Datum));
1333 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1334 desc->natts * sizeof(bool));
1337 tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1338 tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1342 ecxt->ecxt_scantuple = old_slot;
1343 old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1345 if (tmp_new_slot)
1347 ExecStoreVirtualTuple(tmp_new_slot);
1348 ecxt->ecxt_scantuple = tmp_new_slot;
1350 else
1351 ecxt->ecxt_scantuple = new_slot;
1353 new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1356 * Case 1: if both tuples don't match the row filter, bailout. Send
1357 * nothing.
1359 if (!old_matched && !new_matched)
1360 return false;
1363 * Case 2: if the old tuple doesn't satisfy the row filter but the new
1364 * tuple does, transform the UPDATE into INSERT.
1366 * Use the newly transformed tuple that must contain the column values for
1367 * all the replica identity columns. This is required to ensure that the
1368 * while inserting the tuple in the downstream node, we have all the
1369 * required column values.
1371 if (!old_matched && new_matched)
1373 *action = REORDER_BUFFER_CHANGE_INSERT;
1375 if (tmp_new_slot)
1376 *new_slot_ptr = tmp_new_slot;
1380 * Case 3: if the old tuple satisfies the row filter but the new tuple
1381 * doesn't, transform the UPDATE into DELETE.
1383 * This transformation does not require another tuple. The Old tuple will
1384 * be used for DELETE.
1386 else if (old_matched && !new_matched)
1387 *action = REORDER_BUFFER_CHANGE_DELETE;
1390 * Case 4: if both tuples match the row filter, transformation isn't
1391 * required. (*action is default UPDATE).
1394 return true;
1398 * Sends the decoded DML over wire.
1400 * This is called both in streaming and non-streaming modes.
1402 static void
1403 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1404 Relation relation, ReorderBufferChange *change)
1406 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1407 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1408 MemoryContext old;
1409 RelationSyncEntry *relentry;
1410 TransactionId xid = InvalidTransactionId;
1411 Relation ancestor = NULL;
1412 Relation targetrel = relation;
1413 ReorderBufferChangeType action = change->action;
1414 TupleTableSlot *old_slot = NULL;
1415 TupleTableSlot *new_slot = NULL;
1417 if (!is_publishable_relation(relation))
1418 return;
1421 * Remember the xid for the change in streaming mode. We need to send xid
1422 * with each change in the streaming mode so that subscriber can make
1423 * their association and on aborts, it can discard the corresponding
1424 * changes.
1426 if (data->in_streaming)
1427 xid = change->txn->xid;
1429 relentry = get_rel_sync_entry(data, relation);
1431 /* First check the table filter */
1432 switch (action)
1434 case REORDER_BUFFER_CHANGE_INSERT:
1435 if (!relentry->pubactions.pubinsert)
1436 return;
1437 break;
1438 case REORDER_BUFFER_CHANGE_UPDATE:
1439 if (!relentry->pubactions.pubupdate)
1440 return;
1441 break;
1442 case REORDER_BUFFER_CHANGE_DELETE:
1443 if (!relentry->pubactions.pubdelete)
1444 return;
1447 * This is only possible if deletes are allowed even when replica
1448 * identity is not defined for a table. Since the DELETE action
1449 * can't be published, we simply return.
1451 if (!change->data.tp.oldtuple)
1453 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1454 return;
1456 break;
1457 default:
1458 Assert(false);
1461 /* Avoid leaking memory by using and resetting our own context */
1462 old = MemoryContextSwitchTo(data->context);
1464 /* Switch relation if publishing via root. */
1465 if (relentry->publish_as_relid != RelationGetRelid(relation))
1467 Assert(relation->rd_rel->relispartition);
1468 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1469 targetrel = ancestor;
1472 if (change->data.tp.oldtuple)
1474 old_slot = relentry->old_slot;
1475 ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1477 /* Convert tuple if needed. */
1478 if (relentry->attrmap)
1480 TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1481 &TTSOpsVirtual);
1483 old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1487 if (change->data.tp.newtuple)
1489 new_slot = relentry->new_slot;
1490 ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1492 /* Convert tuple if needed. */
1493 if (relentry->attrmap)
1495 TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1496 &TTSOpsVirtual);
1498 new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1503 * Check row filter.
1505 * Updates could be transformed to inserts or deletes based on the results
1506 * of the row filter for old and new tuple.
1508 if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1509 goto cleanup;
1512 * Send BEGIN if we haven't yet.
1514 * We send the BEGIN message after ensuring that we will actually send the
1515 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1516 * transactions.
1518 if (txndata && !txndata->sent_begin_txn)
1519 pgoutput_send_begin(ctx, txn);
1522 * Schema should be sent using the original relation because it also sends
1523 * the ancestor's relation.
1525 maybe_send_schema(ctx, change, relation, relentry);
1527 OutputPluginPrepareWrite(ctx, true);
1529 /* Send the data */
1530 switch (action)
1532 case REORDER_BUFFER_CHANGE_INSERT:
1533 logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1534 data->binary, relentry->columns);
1535 break;
1536 case REORDER_BUFFER_CHANGE_UPDATE:
1537 logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1538 new_slot, data->binary, relentry->columns);
1539 break;
1540 case REORDER_BUFFER_CHANGE_DELETE:
1541 logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1542 data->binary, relentry->columns);
1543 break;
1544 default:
1545 Assert(false);
1548 OutputPluginWrite(ctx, true);
1550 cleanup:
1551 if (RelationIsValid(ancestor))
1553 RelationClose(ancestor);
1554 ancestor = NULL;
1557 /* Drop the new slots that were used to store the converted tuples. */
1558 if (relentry->attrmap)
1560 if (old_slot)
1561 ExecDropSingleTupleTableSlot(old_slot);
1563 if (new_slot)
1564 ExecDropSingleTupleTableSlot(new_slot);
1567 MemoryContextSwitchTo(old);
1568 MemoryContextReset(data->context);
1571 static void
1572 pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1573 int nrelations, Relation relations[], ReorderBufferChange *change)
1575 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1576 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1577 MemoryContext old;
1578 RelationSyncEntry *relentry;
1579 int i;
1580 int nrelids;
1581 Oid *relids;
1582 TransactionId xid = InvalidTransactionId;
1584 /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1585 if (data->in_streaming)
1586 xid = change->txn->xid;
1588 old = MemoryContextSwitchTo(data->context);
1590 relids = palloc0(nrelations * sizeof(Oid));
1591 nrelids = 0;
1593 for (i = 0; i < nrelations; i++)
1595 Relation relation = relations[i];
1596 Oid relid = RelationGetRelid(relation);
1598 if (!is_publishable_relation(relation))
1599 continue;
1601 relentry = get_rel_sync_entry(data, relation);
1603 if (!relentry->pubactions.pubtruncate)
1604 continue;
1607 * Don't send partitions if the publication wants to send only the
1608 * root tables through it.
1610 if (relation->rd_rel->relispartition &&
1611 relentry->publish_as_relid != relid)
1612 continue;
1614 relids[nrelids++] = relid;
1616 /* Send BEGIN if we haven't yet */
1617 if (txndata && !txndata->sent_begin_txn)
1618 pgoutput_send_begin(ctx, txn);
1620 maybe_send_schema(ctx, change, relation, relentry);
1623 if (nrelids > 0)
1625 OutputPluginPrepareWrite(ctx, true);
1626 logicalrep_write_truncate(ctx->out,
1627 xid,
1628 nrelids,
1629 relids,
1630 change->data.truncate.cascade,
1631 change->data.truncate.restart_seqs);
1632 OutputPluginWrite(ctx, true);
1635 MemoryContextSwitchTo(old);
1636 MemoryContextReset(data->context);
1639 static void
1640 pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1641 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1642 const char *message)
1644 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1645 TransactionId xid = InvalidTransactionId;
1647 if (!data->messages)
1648 return;
1651 * Remember the xid for the message in streaming mode. See
1652 * pgoutput_change.
1654 if (data->in_streaming)
1655 xid = txn->xid;
1658 * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1660 if (transactional)
1662 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1664 /* Send BEGIN if we haven't yet */
1665 if (txndata && !txndata->sent_begin_txn)
1666 pgoutput_send_begin(ctx, txn);
1669 OutputPluginPrepareWrite(ctx, true);
1670 logicalrep_write_message(ctx->out,
1671 xid,
1672 message_lsn,
1673 transactional,
1674 prefix,
1676 message);
1677 OutputPluginWrite(ctx, true);
1681 * Return true if the data is associated with an origin and the user has
1682 * requested the changes that don't have an origin, false otherwise.
1684 static bool
1685 pgoutput_origin_filter(LogicalDecodingContext *ctx,
1686 RepOriginId origin_id)
1688 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1690 if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1691 return true;
1693 return false;
1697 * Shutdown the output plugin.
1699 * Note, we don't need to clean the data->context and data->cachectx as
1700 * they are child contexts of the ctx->context so they will be cleaned up by
1701 * logical decoding machinery.
1703 static void
1704 pgoutput_shutdown(LogicalDecodingContext *ctx)
1706 if (RelationSyncCache)
1708 hash_destroy(RelationSyncCache);
1709 RelationSyncCache = NULL;
1714 * Load publications from the list of publication names.
1716 static List *
1717 LoadPublications(List *pubnames)
1719 List *result = NIL;
1720 ListCell *lc;
1722 foreach(lc, pubnames)
1724 char *pubname = (char *) lfirst(lc);
1725 Publication *pub = GetPublicationByName(pubname, false);
1727 result = lappend(result, pub);
1730 return result;
1734 * Publication syscache invalidation callback.
1736 * Called for invalidations on pg_publication.
1738 static void
1739 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1741 publications_valid = false;
1744 * Also invalidate per-relation cache so that next time the filtering info
1745 * is checked it will be updated with the new publication settings.
1747 rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1751 * START STREAM callback
1753 static void
1754 pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1755 ReorderBufferTXN *txn)
1757 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1758 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1760 /* we can't nest streaming of transactions */
1761 Assert(!data->in_streaming);
1764 * If we already sent the first stream for this transaction then don't
1765 * send the origin id in the subsequent streams.
1767 if (rbtxn_is_streamed(txn))
1768 send_replication_origin = false;
1770 OutputPluginPrepareWrite(ctx, !send_replication_origin);
1771 logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1773 send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1774 send_replication_origin);
1776 OutputPluginWrite(ctx, true);
1778 /* we're streaming a chunk of transaction now */
1779 data->in_streaming = true;
1783 * STOP STREAM callback
1785 static void
1786 pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1787 ReorderBufferTXN *txn)
1789 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1791 /* we should be streaming a transaction */
1792 Assert(data->in_streaming);
1794 OutputPluginPrepareWrite(ctx, true);
1795 logicalrep_write_stream_stop(ctx->out);
1796 OutputPluginWrite(ctx, true);
1798 /* we've stopped streaming a transaction */
1799 data->in_streaming = false;
1803 * Notify downstream to discard the streamed transaction (along with all
1804 * it's subtransactions, if it's a toplevel transaction).
1806 static void
1807 pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1808 ReorderBufferTXN *txn,
1809 XLogRecPtr abort_lsn)
1811 ReorderBufferTXN *toptxn;
1812 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1813 bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1816 * The abort should happen outside streaming block, even for streamed
1817 * transactions. The transaction has to be marked as streamed, though.
1819 Assert(!data->in_streaming);
1821 /* determine the toplevel transaction */
1822 toptxn = rbtxn_get_toptxn(txn);
1824 Assert(rbtxn_is_streamed(toptxn));
1826 OutputPluginPrepareWrite(ctx, true);
1827 logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1828 txn->xact_time.abort_time, write_abort_info);
1830 OutputPluginWrite(ctx, true);
1832 cleanup_rel_sync_cache(toptxn->xid, false);
1836 * Notify downstream to apply the streamed transaction (along with all
1837 * it's subtransactions).
1839 static void
1840 pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1841 ReorderBufferTXN *txn,
1842 XLogRecPtr commit_lsn)
1844 PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1847 * The commit should happen outside streaming block, even for streamed
1848 * transactions. The transaction has to be marked as streamed, though.
1850 Assert(!data->in_streaming);
1851 Assert(rbtxn_is_streamed(txn));
1853 OutputPluginUpdateProgress(ctx, false);
1855 OutputPluginPrepareWrite(ctx, true);
1856 logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1857 OutputPluginWrite(ctx, true);
1859 cleanup_rel_sync_cache(txn->xid, true);
1863 * PREPARE callback (for streaming two-phase commit).
1865 * Notify the downstream to prepare the transaction.
1867 static void
1868 pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1869 ReorderBufferTXN *txn,
1870 XLogRecPtr prepare_lsn)
1872 Assert(rbtxn_is_streamed(txn));
1874 OutputPluginUpdateProgress(ctx, false);
1875 OutputPluginPrepareWrite(ctx, true);
1876 logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1877 OutputPluginWrite(ctx, true);
1881 * Initialize the relation schema sync cache for a decoding session.
1883 * The hash table is destroyed at the end of a decoding session. While
1884 * relcache invalidations still exist and will still be invoked, they
1885 * will just see the null hash table global and take no action.
1887 static void
1888 init_rel_sync_cache(MemoryContext cachectx)
1890 HASHCTL ctl;
1891 static bool relation_callbacks_registered = false;
1893 /* Nothing to do if hash table already exists */
1894 if (RelationSyncCache != NULL)
1895 return;
1897 /* Make a new hash table for the cache */
1898 ctl.keysize = sizeof(Oid);
1899 ctl.entrysize = sizeof(RelationSyncEntry);
1900 ctl.hcxt = cachectx;
1902 RelationSyncCache = hash_create("logical replication output relation cache",
1903 128, &ctl,
1904 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1906 Assert(RelationSyncCache != NULL);
1908 /* No more to do if we already registered callbacks */
1909 if (relation_callbacks_registered)
1910 return;
1912 /* We must update the cache entry for a relation after a relcache flush */
1913 CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1916 * Flush all cache entries after a pg_namespace change, in case it was a
1917 * schema rename affecting a relation being replicated.
1919 CacheRegisterSyscacheCallback(NAMESPACEOID,
1920 rel_sync_cache_publication_cb,
1921 (Datum) 0);
1924 * Flush all cache entries after any publication changes. (We need no
1925 * callback entry for pg_publication, because publication_invalidation_cb
1926 * will take care of it.)
1928 CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1929 rel_sync_cache_publication_cb,
1930 (Datum) 0);
1931 CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1932 rel_sync_cache_publication_cb,
1933 (Datum) 0);
1935 relation_callbacks_registered = true;
1939 * We expect relatively small number of streamed transactions.
1941 static bool
1942 get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1944 return list_member_xid(entry->streamed_txns, xid);
1948 * Add the xid in the rel sync entry for which we have already sent the schema
1949 * of the relation.
1951 static void
1952 set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1954 MemoryContext oldctx;
1956 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1958 entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1960 MemoryContextSwitchTo(oldctx);
1964 * Find or create entry in the relation schema cache.
1966 * This looks up publications that the given relation is directly or
1967 * indirectly part of (the latter if it's really the relation's ancestor that
1968 * is part of a publication) and fills up the found entry with the information
1969 * about which operations to publish and whether to use an ancestor's schema
1970 * when publishing.
1972 static RelationSyncEntry *
1973 get_rel_sync_entry(PGOutputData *data, Relation relation)
1975 RelationSyncEntry *entry;
1976 bool found;
1977 MemoryContext oldctx;
1978 Oid relid = RelationGetRelid(relation);
1980 Assert(RelationSyncCache != NULL);
1982 /* Find cached relation info, creating if not found */
1983 entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
1984 &relid,
1985 HASH_ENTER, &found);
1986 Assert(entry != NULL);
1988 /* initialize entry, if it's new */
1989 if (!found)
1991 entry->replicate_valid = false;
1992 entry->schema_sent = false;
1993 entry->streamed_txns = NIL;
1994 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1995 entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1996 entry->new_slot = NULL;
1997 entry->old_slot = NULL;
1998 memset(entry->exprstate, 0, sizeof(entry->exprstate));
1999 entry->entry_cxt = NULL;
2000 entry->publish_as_relid = InvalidOid;
2001 entry->columns = NULL;
2002 entry->attrmap = NULL;
2005 /* Validate the entry */
2006 if (!entry->replicate_valid)
2008 Oid schemaId = get_rel_namespace(relid);
2009 List *pubids = GetRelationPublications(relid);
2012 * We don't acquire a lock on the namespace system table as we build
2013 * the cache entry using a historic snapshot and all the later changes
2014 * are absorbed while decoding WAL.
2016 List *schemaPubids = GetSchemaPublications(schemaId);
2017 ListCell *lc;
2018 Oid publish_as_relid = relid;
2019 int publish_ancestor_level = 0;
2020 bool am_partition = get_rel_relispartition(relid);
2021 char relkind = get_rel_relkind(relid);
2022 List *rel_publications = NIL;
2024 /* Reload publications if needed before use. */
2025 if (!publications_valid)
2027 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2028 if (data->publications)
2030 list_free_deep(data->publications);
2031 data->publications = NIL;
2033 data->publications = LoadPublications(data->publication_names);
2034 MemoryContextSwitchTo(oldctx);
2035 publications_valid = true;
2039 * Reset schema_sent status as the relation definition may have
2040 * changed. Also reset pubactions to empty in case rel was dropped
2041 * from a publication. Also free any objects that depended on the
2042 * earlier definition.
2044 entry->schema_sent = false;
2045 list_free(entry->streamed_txns);
2046 entry->streamed_txns = NIL;
2047 bms_free(entry->columns);
2048 entry->columns = NULL;
2049 entry->pubactions.pubinsert = false;
2050 entry->pubactions.pubupdate = false;
2051 entry->pubactions.pubdelete = false;
2052 entry->pubactions.pubtruncate = false;
2055 * Tuple slots cleanups. (Will be rebuilt later if needed).
2057 if (entry->old_slot)
2058 ExecDropSingleTupleTableSlot(entry->old_slot);
2059 if (entry->new_slot)
2060 ExecDropSingleTupleTableSlot(entry->new_slot);
2062 entry->old_slot = NULL;
2063 entry->new_slot = NULL;
2065 if (entry->attrmap)
2066 free_attrmap(entry->attrmap);
2067 entry->attrmap = NULL;
2070 * Row filter cache cleanups.
2072 if (entry->entry_cxt)
2073 MemoryContextDelete(entry->entry_cxt);
2075 entry->entry_cxt = NULL;
2076 entry->estate = NULL;
2077 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2080 * Build publication cache. We can't use one provided by relcache as
2081 * relcache considers all publications that the given relation is in,
2082 * but here we only need to consider ones that the subscriber
2083 * requested.
2085 foreach(lc, data->publications)
2087 Publication *pub = lfirst(lc);
2088 bool publish = false;
2091 * Under what relid should we publish changes in this publication?
2092 * We'll use the top-most relid across all publications. Also
2093 * track the ancestor level for this publication.
2095 Oid pub_relid = relid;
2096 int ancestor_level = 0;
2099 * If this is a FOR ALL TABLES publication, pick the partition
2100 * root and set the ancestor level accordingly.
2102 if (pub->alltables)
2104 publish = true;
2105 if (pub->pubviaroot && am_partition)
2107 List *ancestors = get_partition_ancestors(relid);
2109 pub_relid = llast_oid(ancestors);
2110 ancestor_level = list_length(ancestors);
2114 if (!publish)
2116 bool ancestor_published = false;
2119 * For a partition, check if any of the ancestors are
2120 * published. If so, note down the topmost ancestor that is
2121 * published via this publication, which will be used as the
2122 * relation via which to publish the partition's changes.
2124 if (am_partition)
2126 Oid ancestor;
2127 int level;
2128 List *ancestors = get_partition_ancestors(relid);
2130 ancestor = GetTopMostAncestorInPublication(pub->oid,
2131 ancestors,
2132 &level);
2134 if (ancestor != InvalidOid)
2136 ancestor_published = true;
2137 if (pub->pubviaroot)
2139 pub_relid = ancestor;
2140 ancestor_level = level;
2145 if (list_member_oid(pubids, pub->oid) ||
2146 list_member_oid(schemaPubids, pub->oid) ||
2147 ancestor_published)
2148 publish = true;
2152 * If the relation is to be published, determine actions to
2153 * publish, and list of columns, if appropriate.
2155 * Don't publish changes for partitioned tables, because
2156 * publishing those of its partitions suffices, unless partition
2157 * changes won't be published due to pubviaroot being set.
2159 if (publish &&
2160 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2162 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2163 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2164 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2165 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2168 * We want to publish the changes as the top-most ancestor
2169 * across all publications. So we need to check if the already
2170 * calculated level is higher than the new one. If yes, we can
2171 * ignore the new value (as it's a child). Otherwise the new
2172 * value is an ancestor, so we keep it.
2174 if (publish_ancestor_level > ancestor_level)
2175 continue;
2178 * If we found an ancestor higher up in the tree, discard the
2179 * list of publications through which we replicate it, and use
2180 * the new ancestor.
2182 if (publish_ancestor_level < ancestor_level)
2184 publish_as_relid = pub_relid;
2185 publish_ancestor_level = ancestor_level;
2187 /* reset the publication list for this relation */
2188 rel_publications = NIL;
2190 else
2192 /* Same ancestor level, has to be the same OID. */
2193 Assert(publish_as_relid == pub_relid);
2196 /* Track publications for this ancestor. */
2197 rel_publications = lappend(rel_publications, pub);
2201 entry->publish_as_relid = publish_as_relid;
2204 * Initialize the tuple slot, map, and row filter. These are only used
2205 * when publishing inserts, updates, or deletes.
2207 if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2208 entry->pubactions.pubdelete)
2210 /* Initialize the tuple slot and map */
2211 init_tuple_slot(data, relation, entry);
2213 /* Initialize the row filter */
2214 pgoutput_row_filter_init(data, rel_publications, entry);
2216 /* Initialize the column list */
2217 pgoutput_column_list_init(data, rel_publications, entry);
2220 list_free(pubids);
2221 list_free(schemaPubids);
2222 list_free(rel_publications);
2224 entry->replicate_valid = true;
2227 return entry;
2231 * Cleanup list of streamed transactions and update the schema_sent flag.
2233 * When a streamed transaction commits or aborts, we need to remove the
2234 * toplevel XID from the schema cache. If the transaction aborted, the
2235 * subscriber will simply throw away the schema records we streamed, so
2236 * we don't need to do anything else.
2238 * If the transaction is committed, the subscriber will update the relation
2239 * cache - so tweak the schema_sent flag accordingly.
2241 static void
2242 cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2244 HASH_SEQ_STATUS hash_seq;
2245 RelationSyncEntry *entry;
2247 Assert(RelationSyncCache != NULL);
2249 hash_seq_init(&hash_seq, RelationSyncCache);
2250 while ((entry = hash_seq_search(&hash_seq)) != NULL)
2253 * We can set the schema_sent flag for an entry that has committed xid
2254 * in the list as that ensures that the subscriber would have the
2255 * corresponding schema and we don't need to send it unless there is
2256 * any invalidation for that relation.
2258 foreach_xid(streamed_txn, entry->streamed_txns)
2260 if (xid == streamed_txn)
2262 if (is_commit)
2263 entry->schema_sent = true;
2265 entry->streamed_txns =
2266 foreach_delete_current(entry->streamed_txns, streamed_txn);
2267 break;
2274 * Relcache invalidation callback
2276 static void
2277 rel_sync_cache_relation_cb(Datum arg, Oid relid)
2279 RelationSyncEntry *entry;
2282 * We can get here if the plugin was used in SQL interface as the
2283 * RelationSyncCache is destroyed when the decoding finishes, but there is
2284 * no way to unregister the relcache invalidation callback.
2286 if (RelationSyncCache == NULL)
2287 return;
2290 * Nobody keeps pointers to entries in this hash table around outside
2291 * logical decoding callback calls - but invalidation events can come in
2292 * *during* a callback if we do any syscache access in the callback.
2293 * Because of that we must mark the cache entry as invalid but not damage
2294 * any of its substructure here. The next get_rel_sync_entry() call will
2295 * rebuild it all.
2297 if (OidIsValid(relid))
2300 * Getting invalidations for relations that aren't in the table is
2301 * entirely normal. So we don't care if it's found or not.
2303 entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2304 HASH_FIND, NULL);
2305 if (entry != NULL)
2306 entry->replicate_valid = false;
2308 else
2310 /* Whole cache must be flushed. */
2311 HASH_SEQ_STATUS status;
2313 hash_seq_init(&status, RelationSyncCache);
2314 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2316 entry->replicate_valid = false;
2322 * Publication relation/schema map syscache invalidation callback
2324 * Called for invalidations on pg_publication, pg_publication_rel,
2325 * pg_publication_namespace, and pg_namespace.
2327 static void
2328 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2330 HASH_SEQ_STATUS status;
2331 RelationSyncEntry *entry;
2334 * We can get here if the plugin was used in SQL interface as the
2335 * RelationSyncCache is destroyed when the decoding finishes, but there is
2336 * no way to unregister the invalidation callbacks.
2338 if (RelationSyncCache == NULL)
2339 return;
2342 * We have no easy way to identify which cache entries this invalidation
2343 * event might have affected, so just mark them all invalid.
2345 hash_seq_init(&status, RelationSyncCache);
2346 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2348 entry->replicate_valid = false;
2352 /* Send Replication origin */
2353 static void
2354 send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2355 XLogRecPtr origin_lsn, bool send_origin)
2357 if (send_origin)
2359 char *origin;
2361 /*----------
2362 * XXX: which behaviour do we want here?
2364 * Alternatives:
2365 * - don't send origin message if origin name not found
2366 * (that's what we do now)
2367 * - throw error - that will break replication, not good
2368 * - send some special "unknown" origin
2369 *----------
2371 if (replorigin_by_oid(origin_id, true, &origin))
2373 /* Message boundary */
2374 OutputPluginWrite(ctx, false);
2375 OutputPluginPrepareWrite(ctx, true);
2377 logicalrep_write_origin(ctx->out, origin, origin_lsn);