1 /*-------------------------------------------------------------------------
4 * example logical decoding output plugin
6 * Copyright (c) 2012-2022, PostgreSQL Global Development Group
9 * contrib/test_decoding/test_decoding.c
11 *-------------------------------------------------------------------------
15 #include "catalog/pg_type.h"
17 #include "replication/logical.h"
18 #include "replication/origin.h"
20 #include "utils/builtins.h"
21 #include "utils/lsyscache.h"
22 #include "utils/memutils.h"
23 #include "utils/rel.h"
29 MemoryContext context
;
31 bool include_timestamp
;
32 bool skip_empty_xacts
;
37 * Maintain the per-transaction level variables to track whether the
38 * transaction and or streams have written any changes. In streaming mode the
39 * transaction can be decoded in streams so along with maintaining whether the
40 * transaction has written any changes, we also need to track whether the
41 * current stream has written any changes. This is required so that if user
42 * has requested to skip the empty transactions we can skip the empty streams
43 * even though the transaction has written some changes.
47 bool xact_wrote_changes
;
48 bool stream_wrote_changes
;
49 } TestDecodingTxnData
;
51 static void pg_decode_startup(LogicalDecodingContext
*ctx
, OutputPluginOptions
*opt
,
53 static void pg_decode_shutdown(LogicalDecodingContext
*ctx
);
54 static void pg_decode_begin_txn(LogicalDecodingContext
*ctx
,
55 ReorderBufferTXN
*txn
);
56 static void pg_output_begin(LogicalDecodingContext
*ctx
,
57 TestDecodingData
*data
,
58 ReorderBufferTXN
*txn
,
60 static void pg_decode_commit_txn(LogicalDecodingContext
*ctx
,
61 ReorderBufferTXN
*txn
, XLogRecPtr commit_lsn
);
62 static void pg_decode_change(LogicalDecodingContext
*ctx
,
63 ReorderBufferTXN
*txn
, Relation relation
,
64 ReorderBufferChange
*change
);
65 static void pg_decode_truncate(LogicalDecodingContext
*ctx
,
66 ReorderBufferTXN
*txn
,
67 int nrelations
, Relation relations
[],
68 ReorderBufferChange
*change
);
69 static bool pg_decode_filter(LogicalDecodingContext
*ctx
,
70 RepOriginId origin_id
);
71 static void pg_decode_message(LogicalDecodingContext
*ctx
,
72 ReorderBufferTXN
*txn
, XLogRecPtr lsn
,
73 bool transactional
, const char *prefix
,
74 Size sz
, const char *message
);
75 static bool pg_decode_filter_prepare(LogicalDecodingContext
*ctx
,
78 static void pg_decode_begin_prepare_txn(LogicalDecodingContext
*ctx
,
79 ReorderBufferTXN
*txn
);
80 static void pg_decode_prepare_txn(LogicalDecodingContext
*ctx
,
81 ReorderBufferTXN
*txn
,
82 XLogRecPtr prepare_lsn
);
83 static void pg_decode_commit_prepared_txn(LogicalDecodingContext
*ctx
,
84 ReorderBufferTXN
*txn
,
85 XLogRecPtr commit_lsn
);
86 static void pg_decode_rollback_prepared_txn(LogicalDecodingContext
*ctx
,
87 ReorderBufferTXN
*txn
,
88 XLogRecPtr prepare_end_lsn
,
89 TimestampTz prepare_time
);
90 static void pg_decode_stream_start(LogicalDecodingContext
*ctx
,
91 ReorderBufferTXN
*txn
);
92 static void pg_output_stream_start(LogicalDecodingContext
*ctx
,
93 TestDecodingData
*data
,
94 ReorderBufferTXN
*txn
,
96 static void pg_decode_stream_stop(LogicalDecodingContext
*ctx
,
97 ReorderBufferTXN
*txn
);
98 static void pg_decode_stream_abort(LogicalDecodingContext
*ctx
,
99 ReorderBufferTXN
*txn
,
100 XLogRecPtr abort_lsn
);
101 static void pg_decode_stream_prepare(LogicalDecodingContext
*ctx
,
102 ReorderBufferTXN
*txn
,
103 XLogRecPtr prepare_lsn
);
104 static void pg_decode_stream_commit(LogicalDecodingContext
*ctx
,
105 ReorderBufferTXN
*txn
,
106 XLogRecPtr commit_lsn
);
107 static void pg_decode_stream_change(LogicalDecodingContext
*ctx
,
108 ReorderBufferTXN
*txn
,
110 ReorderBufferChange
*change
);
111 static void pg_decode_stream_message(LogicalDecodingContext
*ctx
,
112 ReorderBufferTXN
*txn
, XLogRecPtr lsn
,
113 bool transactional
, const char *prefix
,
114 Size sz
, const char *message
);
115 static void pg_decode_stream_truncate(LogicalDecodingContext
*ctx
,
116 ReorderBufferTXN
*txn
,
117 int nrelations
, Relation relations
[],
118 ReorderBufferChange
*change
);
123 /* other plugins can perform things here */
126 /* specify output plugin callbacks */
128 _PG_output_plugin_init(OutputPluginCallbacks
*cb
)
130 AssertVariableIsOfType(&_PG_output_plugin_init
, LogicalOutputPluginInit
);
132 cb
->startup_cb
= pg_decode_startup
;
133 cb
->begin_cb
= pg_decode_begin_txn
;
134 cb
->change_cb
= pg_decode_change
;
135 cb
->truncate_cb
= pg_decode_truncate
;
136 cb
->commit_cb
= pg_decode_commit_txn
;
137 cb
->filter_by_origin_cb
= pg_decode_filter
;
138 cb
->shutdown_cb
= pg_decode_shutdown
;
139 cb
->message_cb
= pg_decode_message
;
140 cb
->filter_prepare_cb
= pg_decode_filter_prepare
;
141 cb
->begin_prepare_cb
= pg_decode_begin_prepare_txn
;
142 cb
->prepare_cb
= pg_decode_prepare_txn
;
143 cb
->commit_prepared_cb
= pg_decode_commit_prepared_txn
;
144 cb
->rollback_prepared_cb
= pg_decode_rollback_prepared_txn
;
145 cb
->stream_start_cb
= pg_decode_stream_start
;
146 cb
->stream_stop_cb
= pg_decode_stream_stop
;
147 cb
->stream_abort_cb
= pg_decode_stream_abort
;
148 cb
->stream_prepare_cb
= pg_decode_stream_prepare
;
149 cb
->stream_commit_cb
= pg_decode_stream_commit
;
150 cb
->stream_change_cb
= pg_decode_stream_change
;
151 cb
->stream_message_cb
= pg_decode_stream_message
;
152 cb
->stream_truncate_cb
= pg_decode_stream_truncate
;
156 /* initialize this plugin */
158 pg_decode_startup(LogicalDecodingContext
*ctx
, OutputPluginOptions
*opt
,
162 TestDecodingData
*data
;
163 bool enable_streaming
= false;
165 data
= palloc0(sizeof(TestDecodingData
));
166 data
->context
= AllocSetContextCreate(ctx
->context
,
167 "text conversion context",
168 ALLOCSET_DEFAULT_SIZES
);
169 data
->include_xids
= true;
170 data
->include_timestamp
= false;
171 data
->skip_empty_xacts
= false;
172 data
->only_local
= false;
174 ctx
->output_plugin_private
= data
;
176 opt
->output_type
= OUTPUT_PLUGIN_TEXTUAL_OUTPUT
;
177 opt
->receive_rewrites
= false;
179 foreach(option
, ctx
->output_plugin_options
)
181 DefElem
*elem
= lfirst(option
);
183 Assert(elem
->arg
== NULL
|| IsA(elem
->arg
, String
));
185 if (strcmp(elem
->defname
, "include-xids") == 0)
187 /* if option does not provide a value, it means its value is true */
188 if (elem
->arg
== NULL
)
189 data
->include_xids
= true;
190 else if (!parse_bool(strVal(elem
->arg
), &data
->include_xids
))
192 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
193 errmsg("could not parse value \"%s\" for parameter \"%s\"",
194 strVal(elem
->arg
), elem
->defname
)));
196 else if (strcmp(elem
->defname
, "include-timestamp") == 0)
198 if (elem
->arg
== NULL
)
199 data
->include_timestamp
= true;
200 else if (!parse_bool(strVal(elem
->arg
), &data
->include_timestamp
))
202 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
203 errmsg("could not parse value \"%s\" for parameter \"%s\"",
204 strVal(elem
->arg
), elem
->defname
)));
206 else if (strcmp(elem
->defname
, "force-binary") == 0)
210 if (elem
->arg
== NULL
)
212 else if (!parse_bool(strVal(elem
->arg
), &force_binary
))
214 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
215 errmsg("could not parse value \"%s\" for parameter \"%s\"",
216 strVal(elem
->arg
), elem
->defname
)));
219 opt
->output_type
= OUTPUT_PLUGIN_BINARY_OUTPUT
;
221 else if (strcmp(elem
->defname
, "skip-empty-xacts") == 0)
224 if (elem
->arg
== NULL
)
225 data
->skip_empty_xacts
= true;
226 else if (!parse_bool(strVal(elem
->arg
), &data
->skip_empty_xacts
))
228 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
229 errmsg("could not parse value \"%s\" for parameter \"%s\"",
230 strVal(elem
->arg
), elem
->defname
)));
232 else if (strcmp(elem
->defname
, "only-local") == 0)
235 if (elem
->arg
== NULL
)
236 data
->only_local
= true;
237 else if (!parse_bool(strVal(elem
->arg
), &data
->only_local
))
239 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
240 errmsg("could not parse value \"%s\" for parameter \"%s\"",
241 strVal(elem
->arg
), elem
->defname
)));
243 else if (strcmp(elem
->defname
, "include-rewrites") == 0)
246 if (elem
->arg
== NULL
)
248 else if (!parse_bool(strVal(elem
->arg
), &opt
->receive_rewrites
))
250 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
251 errmsg("could not parse value \"%s\" for parameter \"%s\"",
252 strVal(elem
->arg
), elem
->defname
)));
254 else if (strcmp(elem
->defname
, "stream-changes") == 0)
256 if (elem
->arg
== NULL
)
258 else if (!parse_bool(strVal(elem
->arg
), &enable_streaming
))
260 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
261 errmsg("could not parse value \"%s\" for parameter \"%s\"",
262 strVal(elem
->arg
), elem
->defname
)));
267 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
268 errmsg("option \"%s\" = \"%s\" is unknown",
270 elem
->arg
? strVal(elem
->arg
) : "(null)")));
274 ctx
->streaming
&= enable_streaming
;
277 /* cleanup this plugin's resources */
279 pg_decode_shutdown(LogicalDecodingContext
*ctx
)
281 TestDecodingData
*data
= ctx
->output_plugin_private
;
283 /* cleanup our own resources via memory context reset */
284 MemoryContextDelete(data
->context
);
289 pg_decode_begin_txn(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
)
291 TestDecodingData
*data
= ctx
->output_plugin_private
;
292 TestDecodingTxnData
*txndata
=
293 MemoryContextAllocZero(ctx
->context
, sizeof(TestDecodingTxnData
));
295 txndata
->xact_wrote_changes
= false;
296 txn
->output_plugin_private
= txndata
;
299 * If asked to skip empty transactions, we'll emit BEGIN at the point
300 * where the first operation is received for this transaction.
302 if (data
->skip_empty_xacts
)
305 pg_output_begin(ctx
, data
, txn
, true);
309 pg_output_begin(LogicalDecodingContext
*ctx
, TestDecodingData
*data
, ReorderBufferTXN
*txn
, bool last_write
)
311 OutputPluginPrepareWrite(ctx
, last_write
);
312 if (data
->include_xids
)
313 appendStringInfo(ctx
->out
, "BEGIN %u", txn
->xid
);
315 appendStringInfoString(ctx
->out
, "BEGIN");
316 OutputPluginWrite(ctx
, last_write
);
319 /* COMMIT callback */
321 pg_decode_commit_txn(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
322 XLogRecPtr commit_lsn
)
324 TestDecodingData
*data
= ctx
->output_plugin_private
;
325 TestDecodingTxnData
*txndata
= txn
->output_plugin_private
;
326 bool xact_wrote_changes
= txndata
->xact_wrote_changes
;
329 txn
->output_plugin_private
= NULL
;
331 if (data
->skip_empty_xacts
&& !xact_wrote_changes
)
334 OutputPluginPrepareWrite(ctx
, true);
335 if (data
->include_xids
)
336 appendStringInfo(ctx
->out
, "COMMIT %u", txn
->xid
);
338 appendStringInfoString(ctx
->out
, "COMMIT");
340 if (data
->include_timestamp
)
341 appendStringInfo(ctx
->out
, " (at %s)",
342 timestamptz_to_str(txn
->xact_time
.commit_time
));
344 OutputPluginWrite(ctx
, true);
347 /* BEGIN PREPARE callback */
349 pg_decode_begin_prepare_txn(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
)
351 TestDecodingData
*data
= ctx
->output_plugin_private
;
352 TestDecodingTxnData
*txndata
=
353 MemoryContextAllocZero(ctx
->context
, sizeof(TestDecodingTxnData
));
355 txndata
->xact_wrote_changes
= false;
356 txn
->output_plugin_private
= txndata
;
359 * If asked to skip empty transactions, we'll emit BEGIN at the point
360 * where the first operation is received for this transaction.
362 if (data
->skip_empty_xacts
)
365 pg_output_begin(ctx
, data
, txn
, true);
368 /* PREPARE callback */
370 pg_decode_prepare_txn(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
371 XLogRecPtr prepare_lsn
)
373 TestDecodingData
*data
= ctx
->output_plugin_private
;
374 TestDecodingTxnData
*txndata
= txn
->output_plugin_private
;
377 * If asked to skip empty transactions, we'll emit PREPARE at the point
378 * where the first operation is received for this transaction.
380 if (data
->skip_empty_xacts
&& !txndata
->xact_wrote_changes
)
383 OutputPluginPrepareWrite(ctx
, true);
385 appendStringInfo(ctx
->out
, "PREPARE TRANSACTION %s",
386 quote_literal_cstr(txn
->gid
));
388 if (data
->include_xids
)
389 appendStringInfo(ctx
->out
, ", txid %u", txn
->xid
);
391 if (data
->include_timestamp
)
392 appendStringInfo(ctx
->out
, " (at %s)",
393 timestamptz_to_str(txn
->xact_time
.prepare_time
));
395 OutputPluginWrite(ctx
, true);
398 /* COMMIT PREPARED callback */
400 pg_decode_commit_prepared_txn(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
401 XLogRecPtr commit_lsn
)
403 TestDecodingData
*data
= ctx
->output_plugin_private
;
405 OutputPluginPrepareWrite(ctx
, true);
407 appendStringInfo(ctx
->out
, "COMMIT PREPARED %s",
408 quote_literal_cstr(txn
->gid
));
410 if (data
->include_xids
)
411 appendStringInfo(ctx
->out
, ", txid %u", txn
->xid
);
413 if (data
->include_timestamp
)
414 appendStringInfo(ctx
->out
, " (at %s)",
415 timestamptz_to_str(txn
->xact_time
.commit_time
));
417 OutputPluginWrite(ctx
, true);
420 /* ROLLBACK PREPARED callback */
422 pg_decode_rollback_prepared_txn(LogicalDecodingContext
*ctx
,
423 ReorderBufferTXN
*txn
,
424 XLogRecPtr prepare_end_lsn
,
425 TimestampTz prepare_time
)
427 TestDecodingData
*data
= ctx
->output_plugin_private
;
429 OutputPluginPrepareWrite(ctx
, true);
431 appendStringInfo(ctx
->out
, "ROLLBACK PREPARED %s",
432 quote_literal_cstr(txn
->gid
));
434 if (data
->include_xids
)
435 appendStringInfo(ctx
->out
, ", txid %u", txn
->xid
);
437 if (data
->include_timestamp
)
438 appendStringInfo(ctx
->out
, " (at %s)",
439 timestamptz_to_str(txn
->xact_time
.commit_time
));
441 OutputPluginWrite(ctx
, true);
445 * Filter out two-phase transactions.
447 * Each plugin can implement its own filtering logic. Here we demonstrate a
448 * simple logic by checking the GID. If the GID contains the "_nodecode"
449 * substring, then we filter it out.
452 pg_decode_filter_prepare(LogicalDecodingContext
*ctx
, TransactionId xid
,
455 if (strstr(gid
, "_nodecode") != NULL
)
462 pg_decode_filter(LogicalDecodingContext
*ctx
,
463 RepOriginId origin_id
)
465 TestDecodingData
*data
= ctx
->output_plugin_private
;
467 if (data
->only_local
&& origin_id
!= InvalidRepOriginId
)
473 * Print literal `outputstr' already represented as string of type `typid'
474 * into stringbuf `s'.
476 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
477 * if standard_conforming_strings were enabled.
480 print_literal(StringInfo s
, Oid typid
, char *outputstr
)
493 /* NB: We don't care about Inf, NaN et al. */
494 appendStringInfoString(s
, outputstr
);
499 appendStringInfo(s
, "B'%s'", outputstr
);
503 if (strcmp(outputstr
, "t") == 0)
504 appendStringInfoString(s
, "true");
506 appendStringInfoString(s
, "false");
510 appendStringInfoChar(s
, '\'');
511 for (valptr
= outputstr
; *valptr
; valptr
++)
515 if (SQL_STR_DOUBLE(ch
, false))
516 appendStringInfoChar(s
, ch
);
517 appendStringInfoChar(s
, ch
);
519 appendStringInfoChar(s
, '\'');
524 /* print the tuple 'tuple' into the StringInfo s */
526 tuple_to_stringinfo(StringInfo s
, TupleDesc tupdesc
, HeapTuple tuple
, bool skip_nulls
)
530 /* print all columns individually */
531 for (natt
= 0; natt
< tupdesc
->natts
; natt
++)
533 Form_pg_attribute attr
; /* the attribute itself */
534 Oid typid
; /* type of current attribute */
535 Oid typoutput
; /* output function */
537 Datum origval
; /* possibly toasted Datum */
538 bool isnull
; /* column is null? */
540 attr
= TupleDescAttr(tupdesc
, natt
);
543 * don't print dropped columns, we can't be sure everything is
546 if (attr
->attisdropped
)
550 * Don't print system columns, oid will already have been printed if
553 if (attr
->attnum
< 0)
556 typid
= attr
->atttypid
;
558 /* get Datum from tuple */
559 origval
= heap_getattr(tuple
, natt
+ 1, tupdesc
, &isnull
);
561 if (isnull
&& skip_nulls
)
564 /* print attribute name */
565 appendStringInfoChar(s
, ' ');
566 appendStringInfoString(s
, quote_identifier(NameStr(attr
->attname
)));
568 /* print attribute type */
569 appendStringInfoChar(s
, '[');
570 appendStringInfoString(s
, format_type_be(typid
));
571 appendStringInfoChar(s
, ']');
573 /* query output function */
574 getTypeOutputInfo(typid
,
575 &typoutput
, &typisvarlena
);
577 /* print separator */
578 appendStringInfoChar(s
, ':');
582 appendStringInfoString(s
, "null");
583 else if (typisvarlena
&& VARATT_IS_EXTERNAL_ONDISK(origval
))
584 appendStringInfoString(s
, "unchanged-toast-datum");
585 else if (!typisvarlena
)
586 print_literal(s
, typid
,
587 OidOutputFunctionCall(typoutput
, origval
));
590 Datum val
; /* definitely detoasted Datum */
592 val
= PointerGetDatum(PG_DETOAST_DATUM(origval
));
593 print_literal(s
, typid
, OidOutputFunctionCall(typoutput
, val
));
599 * callback for individual changed tuples
602 pg_decode_change(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
603 Relation relation
, ReorderBufferChange
*change
)
605 TestDecodingData
*data
;
606 TestDecodingTxnData
*txndata
;
607 Form_pg_class class_form
;
611 data
= ctx
->output_plugin_private
;
612 txndata
= txn
->output_plugin_private
;
614 /* output BEGIN if we haven't yet */
615 if (data
->skip_empty_xacts
&& !txndata
->xact_wrote_changes
)
617 pg_output_begin(ctx
, data
, txn
, false);
619 txndata
->xact_wrote_changes
= true;
621 class_form
= RelationGetForm(relation
);
622 tupdesc
= RelationGetDescr(relation
);
624 /* Avoid leaking memory by using and resetting our own context */
625 old
= MemoryContextSwitchTo(data
->context
);
627 OutputPluginPrepareWrite(ctx
, true);
629 appendStringInfoString(ctx
->out
, "table ");
630 appendStringInfoString(ctx
->out
,
631 quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation
))),
632 class_form
->relrewrite
?
633 get_rel_name(class_form
->relrewrite
) :
634 NameStr(class_form
->relname
)));
635 appendStringInfoChar(ctx
->out
, ':');
637 switch (change
->action
)
639 case REORDER_BUFFER_CHANGE_INSERT
:
640 appendStringInfoString(ctx
->out
, " INSERT:");
641 if (change
->data
.tp
.newtuple
== NULL
)
642 appendStringInfoString(ctx
->out
, " (no-tuple-data)");
644 tuple_to_stringinfo(ctx
->out
, tupdesc
,
645 &change
->data
.tp
.newtuple
->tuple
,
648 case REORDER_BUFFER_CHANGE_UPDATE
:
649 appendStringInfoString(ctx
->out
, " UPDATE:");
650 if (change
->data
.tp
.oldtuple
!= NULL
)
652 appendStringInfoString(ctx
->out
, " old-key:");
653 tuple_to_stringinfo(ctx
->out
, tupdesc
,
654 &change
->data
.tp
.oldtuple
->tuple
,
656 appendStringInfoString(ctx
->out
, " new-tuple:");
659 if (change
->data
.tp
.newtuple
== NULL
)
660 appendStringInfoString(ctx
->out
, " (no-tuple-data)");
662 tuple_to_stringinfo(ctx
->out
, tupdesc
,
663 &change
->data
.tp
.newtuple
->tuple
,
666 case REORDER_BUFFER_CHANGE_DELETE
:
667 appendStringInfoString(ctx
->out
, " DELETE:");
669 /* if there was no PK, we only know that a delete happened */
670 if (change
->data
.tp
.oldtuple
== NULL
)
671 appendStringInfoString(ctx
->out
, " (no-tuple-data)");
672 /* In DELETE, only the replica identity is present; display that */
674 tuple_to_stringinfo(ctx
->out
, tupdesc
,
675 &change
->data
.tp
.oldtuple
->tuple
,
682 MemoryContextSwitchTo(old
);
683 MemoryContextReset(data
->context
);
685 OutputPluginWrite(ctx
, true);
689 pg_decode_truncate(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
690 int nrelations
, Relation relations
[], ReorderBufferChange
*change
)
692 TestDecodingData
*data
;
693 TestDecodingTxnData
*txndata
;
697 data
= ctx
->output_plugin_private
;
698 txndata
= txn
->output_plugin_private
;
700 /* output BEGIN if we haven't yet */
701 if (data
->skip_empty_xacts
&& !txndata
->xact_wrote_changes
)
703 pg_output_begin(ctx
, data
, txn
, false);
705 txndata
->xact_wrote_changes
= true;
707 /* Avoid leaking memory by using and resetting our own context */
708 old
= MemoryContextSwitchTo(data
->context
);
710 OutputPluginPrepareWrite(ctx
, true);
712 appendStringInfoString(ctx
->out
, "table ");
714 for (i
= 0; i
< nrelations
; i
++)
717 appendStringInfoString(ctx
->out
, ", ");
719 appendStringInfoString(ctx
->out
,
720 quote_qualified_identifier(get_namespace_name(relations
[i
]->rd_rel
->relnamespace
),
721 NameStr(relations
[i
]->rd_rel
->relname
)));
724 appendStringInfoString(ctx
->out
, ": TRUNCATE:");
726 if (change
->data
.truncate
.restart_seqs
727 || change
->data
.truncate
.cascade
)
729 if (change
->data
.truncate
.restart_seqs
)
730 appendStringInfoString(ctx
->out
, " restart_seqs");
731 if (change
->data
.truncate
.cascade
)
732 appendStringInfoString(ctx
->out
, " cascade");
735 appendStringInfoString(ctx
->out
, " (no-flags)");
737 MemoryContextSwitchTo(old
);
738 MemoryContextReset(data
->context
);
740 OutputPluginWrite(ctx
, true);
744 pg_decode_message(LogicalDecodingContext
*ctx
,
745 ReorderBufferTXN
*txn
, XLogRecPtr lsn
, bool transactional
,
746 const char *prefix
, Size sz
, const char *message
)
748 OutputPluginPrepareWrite(ctx
, true);
749 appendStringInfo(ctx
->out
, "message: transactional: %d prefix: %s, sz: %zu content:",
750 transactional
, prefix
, sz
);
751 appendBinaryStringInfo(ctx
->out
, message
, sz
);
752 OutputPluginWrite(ctx
, true);
756 pg_decode_stream_start(LogicalDecodingContext
*ctx
,
757 ReorderBufferTXN
*txn
)
759 TestDecodingData
*data
= ctx
->output_plugin_private
;
760 TestDecodingTxnData
*txndata
= txn
->output_plugin_private
;
763 * Allocate the txn plugin data for the first stream in the transaction.
768 MemoryContextAllocZero(ctx
->context
, sizeof(TestDecodingTxnData
));
769 txndata
->xact_wrote_changes
= false;
770 txn
->output_plugin_private
= txndata
;
773 txndata
->stream_wrote_changes
= false;
774 if (data
->skip_empty_xacts
)
776 pg_output_stream_start(ctx
, data
, txn
, true);
780 pg_output_stream_start(LogicalDecodingContext
*ctx
, TestDecodingData
*data
, ReorderBufferTXN
*txn
, bool last_write
)
782 OutputPluginPrepareWrite(ctx
, last_write
);
783 if (data
->include_xids
)
784 appendStringInfo(ctx
->out
, "opening a streamed block for transaction TXN %u", txn
->xid
);
786 appendStringInfoString(ctx
->out
, "opening a streamed block for transaction");
787 OutputPluginWrite(ctx
, last_write
);
791 pg_decode_stream_stop(LogicalDecodingContext
*ctx
,
792 ReorderBufferTXN
*txn
)
794 TestDecodingData
*data
= ctx
->output_plugin_private
;
795 TestDecodingTxnData
*txndata
= txn
->output_plugin_private
;
797 if (data
->skip_empty_xacts
&& !txndata
->stream_wrote_changes
)
800 OutputPluginPrepareWrite(ctx
, true);
801 if (data
->include_xids
)
802 appendStringInfo(ctx
->out
, "closing a streamed block for transaction TXN %u", txn
->xid
);
804 appendStringInfoString(ctx
->out
, "closing a streamed block for transaction");
805 OutputPluginWrite(ctx
, true);
809 pg_decode_stream_abort(LogicalDecodingContext
*ctx
,
810 ReorderBufferTXN
*txn
,
811 XLogRecPtr abort_lsn
)
813 TestDecodingData
*data
= ctx
->output_plugin_private
;
816 * stream abort can be sent for an individual subtransaction but we
817 * maintain the output_plugin_private only under the toptxn so if this is
818 * not the toptxn then fetch the toptxn.
820 ReorderBufferTXN
*toptxn
= txn
->toptxn
? txn
->toptxn
: txn
;
821 TestDecodingTxnData
*txndata
= toptxn
->output_plugin_private
;
822 bool xact_wrote_changes
= txndata
->xact_wrote_changes
;
824 if (txn
->toptxn
== NULL
)
826 Assert(txn
->output_plugin_private
!= NULL
);
828 txn
->output_plugin_private
= NULL
;
831 if (data
->skip_empty_xacts
&& !xact_wrote_changes
)
834 OutputPluginPrepareWrite(ctx
, true);
835 if (data
->include_xids
)
836 appendStringInfo(ctx
->out
, "aborting streamed (sub)transaction TXN %u", txn
->xid
);
838 appendStringInfoString(ctx
->out
, "aborting streamed (sub)transaction");
839 OutputPluginWrite(ctx
, true);
843 pg_decode_stream_prepare(LogicalDecodingContext
*ctx
,
844 ReorderBufferTXN
*txn
,
845 XLogRecPtr prepare_lsn
)
847 TestDecodingData
*data
= ctx
->output_plugin_private
;
848 TestDecodingTxnData
*txndata
= txn
->output_plugin_private
;
850 if (data
->skip_empty_xacts
&& !txndata
->xact_wrote_changes
)
853 OutputPluginPrepareWrite(ctx
, true);
855 if (data
->include_xids
)
856 appendStringInfo(ctx
->out
, "preparing streamed transaction TXN %s, txid %u",
857 quote_literal_cstr(txn
->gid
), txn
->xid
);
859 appendStringInfo(ctx
->out
, "preparing streamed transaction %s",
860 quote_literal_cstr(txn
->gid
));
862 if (data
->include_timestamp
)
863 appendStringInfo(ctx
->out
, " (at %s)",
864 timestamptz_to_str(txn
->xact_time
.prepare_time
));
866 OutputPluginWrite(ctx
, true);
870 pg_decode_stream_commit(LogicalDecodingContext
*ctx
,
871 ReorderBufferTXN
*txn
,
872 XLogRecPtr commit_lsn
)
874 TestDecodingData
*data
= ctx
->output_plugin_private
;
875 TestDecodingTxnData
*txndata
= txn
->output_plugin_private
;
876 bool xact_wrote_changes
= txndata
->xact_wrote_changes
;
879 txn
->output_plugin_private
= NULL
;
881 if (data
->skip_empty_xacts
&& !xact_wrote_changes
)
884 OutputPluginPrepareWrite(ctx
, true);
886 if (data
->include_xids
)
887 appendStringInfo(ctx
->out
, "committing streamed transaction TXN %u", txn
->xid
);
889 appendStringInfoString(ctx
->out
, "committing streamed transaction");
891 if (data
->include_timestamp
)
892 appendStringInfo(ctx
->out
, " (at %s)",
893 timestamptz_to_str(txn
->xact_time
.commit_time
));
895 OutputPluginWrite(ctx
, true);
899 * In streaming mode, we don't display the changes as the transaction can abort
900 * at a later point in time. We don't want users to see the changes until the
901 * transaction is committed.
904 pg_decode_stream_change(LogicalDecodingContext
*ctx
,
905 ReorderBufferTXN
*txn
,
907 ReorderBufferChange
*change
)
909 TestDecodingData
*data
= ctx
->output_plugin_private
;
910 TestDecodingTxnData
*txndata
= txn
->output_plugin_private
;
912 /* output stream start if we haven't yet */
913 if (data
->skip_empty_xacts
&& !txndata
->stream_wrote_changes
)
915 pg_output_stream_start(ctx
, data
, txn
, false);
917 txndata
->xact_wrote_changes
= txndata
->stream_wrote_changes
= true;
919 OutputPluginPrepareWrite(ctx
, true);
920 if (data
->include_xids
)
921 appendStringInfo(ctx
->out
, "streaming change for TXN %u", txn
->xid
);
923 appendStringInfoString(ctx
->out
, "streaming change for transaction");
924 OutputPluginWrite(ctx
, true);
928 * In streaming mode, we don't display the contents for transactional messages
929 * as the transaction can abort at a later point in time. We don't want users to
930 * see the message contents until the transaction is committed.
933 pg_decode_stream_message(LogicalDecodingContext
*ctx
,
934 ReorderBufferTXN
*txn
, XLogRecPtr lsn
, bool transactional
,
935 const char *prefix
, Size sz
, const char *message
)
937 OutputPluginPrepareWrite(ctx
, true);
941 appendStringInfo(ctx
->out
, "streaming message: transactional: %d prefix: %s, sz: %zu",
942 transactional
, prefix
, sz
);
946 appendStringInfo(ctx
->out
, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
947 transactional
, prefix
, sz
);
948 appendBinaryStringInfo(ctx
->out
, message
, sz
);
951 OutputPluginWrite(ctx
, true);
955 * In streaming mode, we don't display the detailed information of Truncate.
956 * See pg_decode_stream_change.
959 pg_decode_stream_truncate(LogicalDecodingContext
*ctx
, ReorderBufferTXN
*txn
,
960 int nrelations
, Relation relations
[],
961 ReorderBufferChange
*change
)
963 TestDecodingData
*data
= ctx
->output_plugin_private
;
964 TestDecodingTxnData
*txndata
= txn
->output_plugin_private
;
966 if (data
->skip_empty_xacts
&& !txndata
->stream_wrote_changes
)
968 pg_output_stream_start(ctx
, data
, txn
, false);
970 txndata
->xact_wrote_changes
= txndata
->stream_wrote_changes
= true;
972 OutputPluginPrepareWrite(ctx
, true);
973 if (data
->include_xids
)
974 appendStringInfo(ctx
->out
, "streaming truncate for TXN %u", txn
->xid
);
976 appendStringInfoString(ctx
->out
, "streaming truncate for transaction");
977 OutputPluginWrite(ctx
, true);