Reject non-ON-SELECT rules that are named "_RETURN".
[pgsql.git] / contrib / test_decoding / test_decoding.c
blobe0fd6f176593948d646391cccf570443ef9b6cc0
1 /*-------------------------------------------------------------------------
3 * test_decoding.c
4 * example logical decoding output plugin
6 * Copyright (c) 2012-2022, PostgreSQL Global Development Group
8 * IDENTIFICATION
9 * contrib/test_decoding/test_decoding.c
11 *-------------------------------------------------------------------------
13 #include "postgres.h"
15 #include "catalog/pg_type.h"
17 #include "replication/logical.h"
18 #include "replication/origin.h"
20 #include "utils/builtins.h"
21 #include "utils/lsyscache.h"
22 #include "utils/memutils.h"
23 #include "utils/rel.h"
25 PG_MODULE_MAGIC;
27 typedef struct
29 MemoryContext context;
30 bool include_xids;
31 bool include_timestamp;
32 bool skip_empty_xacts;
33 bool only_local;
34 } TestDecodingData;
37 * Maintain the per-transaction level variables to track whether the
38 * transaction and or streams have written any changes. In streaming mode the
39 * transaction can be decoded in streams so along with maintaining whether the
40 * transaction has written any changes, we also need to track whether the
41 * current stream has written any changes. This is required so that if user
42 * has requested to skip the empty transactions we can skip the empty streams
43 * even though the transaction has written some changes.
45 typedef struct
47 bool xact_wrote_changes;
48 bool stream_wrote_changes;
49 } TestDecodingTxnData;
51 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
52 bool is_init);
53 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
54 static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
55 ReorderBufferTXN *txn);
56 static void pg_output_begin(LogicalDecodingContext *ctx,
57 TestDecodingData *data,
58 ReorderBufferTXN *txn,
59 bool last_write);
60 static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
61 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
62 static void pg_decode_change(LogicalDecodingContext *ctx,
63 ReorderBufferTXN *txn, Relation relation,
64 ReorderBufferChange *change);
65 static void pg_decode_truncate(LogicalDecodingContext *ctx,
66 ReorderBufferTXN *txn,
67 int nrelations, Relation relations[],
68 ReorderBufferChange *change);
69 static bool pg_decode_filter(LogicalDecodingContext *ctx,
70 RepOriginId origin_id);
71 static void pg_decode_message(LogicalDecodingContext *ctx,
72 ReorderBufferTXN *txn, XLogRecPtr lsn,
73 bool transactional, const char *prefix,
74 Size sz, const char *message);
75 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
76 TransactionId xid,
77 const char *gid);
78 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
79 ReorderBufferTXN *txn);
80 static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
81 ReorderBufferTXN *txn,
82 XLogRecPtr prepare_lsn);
83 static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
84 ReorderBufferTXN *txn,
85 XLogRecPtr commit_lsn);
86 static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
87 ReorderBufferTXN *txn,
88 XLogRecPtr prepare_end_lsn,
89 TimestampTz prepare_time);
90 static void pg_decode_stream_start(LogicalDecodingContext *ctx,
91 ReorderBufferTXN *txn);
92 static void pg_output_stream_start(LogicalDecodingContext *ctx,
93 TestDecodingData *data,
94 ReorderBufferTXN *txn,
95 bool last_write);
96 static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
97 ReorderBufferTXN *txn);
98 static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
99 ReorderBufferTXN *txn,
100 XLogRecPtr abort_lsn);
101 static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
102 ReorderBufferTXN *txn,
103 XLogRecPtr prepare_lsn);
104 static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
105 ReorderBufferTXN *txn,
106 XLogRecPtr commit_lsn);
107 static void pg_decode_stream_change(LogicalDecodingContext *ctx,
108 ReorderBufferTXN *txn,
109 Relation relation,
110 ReorderBufferChange *change);
111 static void pg_decode_stream_message(LogicalDecodingContext *ctx,
112 ReorderBufferTXN *txn, XLogRecPtr lsn,
113 bool transactional, const char *prefix,
114 Size sz, const char *message);
115 static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
116 ReorderBufferTXN *txn,
117 int nrelations, Relation relations[],
118 ReorderBufferChange *change);
120 void
121 _PG_init(void)
123 /* other plugins can perform things here */
126 /* specify output plugin callbacks */
127 void
128 _PG_output_plugin_init(OutputPluginCallbacks *cb)
130 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
132 cb->startup_cb = pg_decode_startup;
133 cb->begin_cb = pg_decode_begin_txn;
134 cb->change_cb = pg_decode_change;
135 cb->truncate_cb = pg_decode_truncate;
136 cb->commit_cb = pg_decode_commit_txn;
137 cb->filter_by_origin_cb = pg_decode_filter;
138 cb->shutdown_cb = pg_decode_shutdown;
139 cb->message_cb = pg_decode_message;
140 cb->filter_prepare_cb = pg_decode_filter_prepare;
141 cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
142 cb->prepare_cb = pg_decode_prepare_txn;
143 cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
144 cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
145 cb->stream_start_cb = pg_decode_stream_start;
146 cb->stream_stop_cb = pg_decode_stream_stop;
147 cb->stream_abort_cb = pg_decode_stream_abort;
148 cb->stream_prepare_cb = pg_decode_stream_prepare;
149 cb->stream_commit_cb = pg_decode_stream_commit;
150 cb->stream_change_cb = pg_decode_stream_change;
151 cb->stream_message_cb = pg_decode_stream_message;
152 cb->stream_truncate_cb = pg_decode_stream_truncate;
156 /* initialize this plugin */
157 static void
158 pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
159 bool is_init)
161 ListCell *option;
162 TestDecodingData *data;
163 bool enable_streaming = false;
165 data = palloc0(sizeof(TestDecodingData));
166 data->context = AllocSetContextCreate(ctx->context,
167 "text conversion context",
168 ALLOCSET_DEFAULT_SIZES);
169 data->include_xids = true;
170 data->include_timestamp = false;
171 data->skip_empty_xacts = false;
172 data->only_local = false;
174 ctx->output_plugin_private = data;
176 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
177 opt->receive_rewrites = false;
179 foreach(option, ctx->output_plugin_options)
181 DefElem *elem = lfirst(option);
183 Assert(elem->arg == NULL || IsA(elem->arg, String));
185 if (strcmp(elem->defname, "include-xids") == 0)
187 /* if option does not provide a value, it means its value is true */
188 if (elem->arg == NULL)
189 data->include_xids = true;
190 else if (!parse_bool(strVal(elem->arg), &data->include_xids))
191 ereport(ERROR,
192 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
193 errmsg("could not parse value \"%s\" for parameter \"%s\"",
194 strVal(elem->arg), elem->defname)));
196 else if (strcmp(elem->defname, "include-timestamp") == 0)
198 if (elem->arg == NULL)
199 data->include_timestamp = true;
200 else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
201 ereport(ERROR,
202 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
203 errmsg("could not parse value \"%s\" for parameter \"%s\"",
204 strVal(elem->arg), elem->defname)));
206 else if (strcmp(elem->defname, "force-binary") == 0)
208 bool force_binary;
210 if (elem->arg == NULL)
211 continue;
212 else if (!parse_bool(strVal(elem->arg), &force_binary))
213 ereport(ERROR,
214 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
215 errmsg("could not parse value \"%s\" for parameter \"%s\"",
216 strVal(elem->arg), elem->defname)));
218 if (force_binary)
219 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
221 else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
224 if (elem->arg == NULL)
225 data->skip_empty_xacts = true;
226 else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
227 ereport(ERROR,
228 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
229 errmsg("could not parse value \"%s\" for parameter \"%s\"",
230 strVal(elem->arg), elem->defname)));
232 else if (strcmp(elem->defname, "only-local") == 0)
235 if (elem->arg == NULL)
236 data->only_local = true;
237 else if (!parse_bool(strVal(elem->arg), &data->only_local))
238 ereport(ERROR,
239 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
240 errmsg("could not parse value \"%s\" for parameter \"%s\"",
241 strVal(elem->arg), elem->defname)));
243 else if (strcmp(elem->defname, "include-rewrites") == 0)
246 if (elem->arg == NULL)
247 continue;
248 else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
249 ereport(ERROR,
250 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
251 errmsg("could not parse value \"%s\" for parameter \"%s\"",
252 strVal(elem->arg), elem->defname)));
254 else if (strcmp(elem->defname, "stream-changes") == 0)
256 if (elem->arg == NULL)
257 continue;
258 else if (!parse_bool(strVal(elem->arg), &enable_streaming))
259 ereport(ERROR,
260 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
261 errmsg("could not parse value \"%s\" for parameter \"%s\"",
262 strVal(elem->arg), elem->defname)));
264 else
266 ereport(ERROR,
267 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
268 errmsg("option \"%s\" = \"%s\" is unknown",
269 elem->defname,
270 elem->arg ? strVal(elem->arg) : "(null)")));
274 ctx->streaming &= enable_streaming;
277 /* cleanup this plugin's resources */
278 static void
279 pg_decode_shutdown(LogicalDecodingContext *ctx)
281 TestDecodingData *data = ctx->output_plugin_private;
283 /* cleanup our own resources via memory context reset */
284 MemoryContextDelete(data->context);
287 /* BEGIN callback */
288 static void
289 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
291 TestDecodingData *data = ctx->output_plugin_private;
292 TestDecodingTxnData *txndata =
293 MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
295 txndata->xact_wrote_changes = false;
296 txn->output_plugin_private = txndata;
299 * If asked to skip empty transactions, we'll emit BEGIN at the point
300 * where the first operation is received for this transaction.
302 if (data->skip_empty_xacts)
303 return;
305 pg_output_begin(ctx, data, txn, true);
308 static void
309 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
311 OutputPluginPrepareWrite(ctx, last_write);
312 if (data->include_xids)
313 appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
314 else
315 appendStringInfoString(ctx->out, "BEGIN");
316 OutputPluginWrite(ctx, last_write);
319 /* COMMIT callback */
320 static void
321 pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
322 XLogRecPtr commit_lsn)
324 TestDecodingData *data = ctx->output_plugin_private;
325 TestDecodingTxnData *txndata = txn->output_plugin_private;
326 bool xact_wrote_changes = txndata->xact_wrote_changes;
328 pfree(txndata);
329 txn->output_plugin_private = NULL;
331 if (data->skip_empty_xacts && !xact_wrote_changes)
332 return;
334 OutputPluginPrepareWrite(ctx, true);
335 if (data->include_xids)
336 appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
337 else
338 appendStringInfoString(ctx->out, "COMMIT");
340 if (data->include_timestamp)
341 appendStringInfo(ctx->out, " (at %s)",
342 timestamptz_to_str(txn->xact_time.commit_time));
344 OutputPluginWrite(ctx, true);
347 /* BEGIN PREPARE callback */
348 static void
349 pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
351 TestDecodingData *data = ctx->output_plugin_private;
352 TestDecodingTxnData *txndata =
353 MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
355 txndata->xact_wrote_changes = false;
356 txn->output_plugin_private = txndata;
359 * If asked to skip empty transactions, we'll emit BEGIN at the point
360 * where the first operation is received for this transaction.
362 if (data->skip_empty_xacts)
363 return;
365 pg_output_begin(ctx, data, txn, true);
368 /* PREPARE callback */
369 static void
370 pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
371 XLogRecPtr prepare_lsn)
373 TestDecodingData *data = ctx->output_plugin_private;
374 TestDecodingTxnData *txndata = txn->output_plugin_private;
377 * If asked to skip empty transactions, we'll emit PREPARE at the point
378 * where the first operation is received for this transaction.
380 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
381 return;
383 OutputPluginPrepareWrite(ctx, true);
385 appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
386 quote_literal_cstr(txn->gid));
388 if (data->include_xids)
389 appendStringInfo(ctx->out, ", txid %u", txn->xid);
391 if (data->include_timestamp)
392 appendStringInfo(ctx->out, " (at %s)",
393 timestamptz_to_str(txn->xact_time.prepare_time));
395 OutputPluginWrite(ctx, true);
398 /* COMMIT PREPARED callback */
399 static void
400 pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
401 XLogRecPtr commit_lsn)
403 TestDecodingData *data = ctx->output_plugin_private;
405 OutputPluginPrepareWrite(ctx, true);
407 appendStringInfo(ctx->out, "COMMIT PREPARED %s",
408 quote_literal_cstr(txn->gid));
410 if (data->include_xids)
411 appendStringInfo(ctx->out, ", txid %u", txn->xid);
413 if (data->include_timestamp)
414 appendStringInfo(ctx->out, " (at %s)",
415 timestamptz_to_str(txn->xact_time.commit_time));
417 OutputPluginWrite(ctx, true);
420 /* ROLLBACK PREPARED callback */
421 static void
422 pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
423 ReorderBufferTXN *txn,
424 XLogRecPtr prepare_end_lsn,
425 TimestampTz prepare_time)
427 TestDecodingData *data = ctx->output_plugin_private;
429 OutputPluginPrepareWrite(ctx, true);
431 appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
432 quote_literal_cstr(txn->gid));
434 if (data->include_xids)
435 appendStringInfo(ctx->out, ", txid %u", txn->xid);
437 if (data->include_timestamp)
438 appendStringInfo(ctx->out, " (at %s)",
439 timestamptz_to_str(txn->xact_time.commit_time));
441 OutputPluginWrite(ctx, true);
445 * Filter out two-phase transactions.
447 * Each plugin can implement its own filtering logic. Here we demonstrate a
448 * simple logic by checking the GID. If the GID contains the "_nodecode"
449 * substring, then we filter it out.
451 static bool
452 pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
453 const char *gid)
455 if (strstr(gid, "_nodecode") != NULL)
456 return true;
458 return false;
461 static bool
462 pg_decode_filter(LogicalDecodingContext *ctx,
463 RepOriginId origin_id)
465 TestDecodingData *data = ctx->output_plugin_private;
467 if (data->only_local && origin_id != InvalidRepOriginId)
468 return true;
469 return false;
473 * Print literal `outputstr' already represented as string of type `typid'
474 * into stringbuf `s'.
476 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
477 * if standard_conforming_strings were enabled.
479 static void
480 print_literal(StringInfo s, Oid typid, char *outputstr)
482 const char *valptr;
484 switch (typid)
486 case INT2OID:
487 case INT4OID:
488 case INT8OID:
489 case OIDOID:
490 case FLOAT4OID:
491 case FLOAT8OID:
492 case NUMERICOID:
493 /* NB: We don't care about Inf, NaN et al. */
494 appendStringInfoString(s, outputstr);
495 break;
497 case BITOID:
498 case VARBITOID:
499 appendStringInfo(s, "B'%s'", outputstr);
500 break;
502 case BOOLOID:
503 if (strcmp(outputstr, "t") == 0)
504 appendStringInfoString(s, "true");
505 else
506 appendStringInfoString(s, "false");
507 break;
509 default:
510 appendStringInfoChar(s, '\'');
511 for (valptr = outputstr; *valptr; valptr++)
513 char ch = *valptr;
515 if (SQL_STR_DOUBLE(ch, false))
516 appendStringInfoChar(s, ch);
517 appendStringInfoChar(s, ch);
519 appendStringInfoChar(s, '\'');
520 break;
524 /* print the tuple 'tuple' into the StringInfo s */
525 static void
526 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
528 int natt;
530 /* print all columns individually */
531 for (natt = 0; natt < tupdesc->natts; natt++)
533 Form_pg_attribute attr; /* the attribute itself */
534 Oid typid; /* type of current attribute */
535 Oid typoutput; /* output function */
536 bool typisvarlena;
537 Datum origval; /* possibly toasted Datum */
538 bool isnull; /* column is null? */
540 attr = TupleDescAttr(tupdesc, natt);
543 * don't print dropped columns, we can't be sure everything is
544 * available for them
546 if (attr->attisdropped)
547 continue;
550 * Don't print system columns, oid will already have been printed if
551 * present.
553 if (attr->attnum < 0)
554 continue;
556 typid = attr->atttypid;
558 /* get Datum from tuple */
559 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
561 if (isnull && skip_nulls)
562 continue;
564 /* print attribute name */
565 appendStringInfoChar(s, ' ');
566 appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
568 /* print attribute type */
569 appendStringInfoChar(s, '[');
570 appendStringInfoString(s, format_type_be(typid));
571 appendStringInfoChar(s, ']');
573 /* query output function */
574 getTypeOutputInfo(typid,
575 &typoutput, &typisvarlena);
577 /* print separator */
578 appendStringInfoChar(s, ':');
580 /* print data */
581 if (isnull)
582 appendStringInfoString(s, "null");
583 else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
584 appendStringInfoString(s, "unchanged-toast-datum");
585 else if (!typisvarlena)
586 print_literal(s, typid,
587 OidOutputFunctionCall(typoutput, origval));
588 else
590 Datum val; /* definitely detoasted Datum */
592 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
593 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
599 * callback for individual changed tuples
601 static void
602 pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
603 Relation relation, ReorderBufferChange *change)
605 TestDecodingData *data;
606 TestDecodingTxnData *txndata;
607 Form_pg_class class_form;
608 TupleDesc tupdesc;
609 MemoryContext old;
611 data = ctx->output_plugin_private;
612 txndata = txn->output_plugin_private;
614 /* output BEGIN if we haven't yet */
615 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
617 pg_output_begin(ctx, data, txn, false);
619 txndata->xact_wrote_changes = true;
621 class_form = RelationGetForm(relation);
622 tupdesc = RelationGetDescr(relation);
624 /* Avoid leaking memory by using and resetting our own context */
625 old = MemoryContextSwitchTo(data->context);
627 OutputPluginPrepareWrite(ctx, true);
629 appendStringInfoString(ctx->out, "table ");
630 appendStringInfoString(ctx->out,
631 quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
632 class_form->relrewrite ?
633 get_rel_name(class_form->relrewrite) :
634 NameStr(class_form->relname)));
635 appendStringInfoChar(ctx->out, ':');
637 switch (change->action)
639 case REORDER_BUFFER_CHANGE_INSERT:
640 appendStringInfoString(ctx->out, " INSERT:");
641 if (change->data.tp.newtuple == NULL)
642 appendStringInfoString(ctx->out, " (no-tuple-data)");
643 else
644 tuple_to_stringinfo(ctx->out, tupdesc,
645 &change->data.tp.newtuple->tuple,
646 false);
647 break;
648 case REORDER_BUFFER_CHANGE_UPDATE:
649 appendStringInfoString(ctx->out, " UPDATE:");
650 if (change->data.tp.oldtuple != NULL)
652 appendStringInfoString(ctx->out, " old-key:");
653 tuple_to_stringinfo(ctx->out, tupdesc,
654 &change->data.tp.oldtuple->tuple,
655 true);
656 appendStringInfoString(ctx->out, " new-tuple:");
659 if (change->data.tp.newtuple == NULL)
660 appendStringInfoString(ctx->out, " (no-tuple-data)");
661 else
662 tuple_to_stringinfo(ctx->out, tupdesc,
663 &change->data.tp.newtuple->tuple,
664 false);
665 break;
666 case REORDER_BUFFER_CHANGE_DELETE:
667 appendStringInfoString(ctx->out, " DELETE:");
669 /* if there was no PK, we only know that a delete happened */
670 if (change->data.tp.oldtuple == NULL)
671 appendStringInfoString(ctx->out, " (no-tuple-data)");
672 /* In DELETE, only the replica identity is present; display that */
673 else
674 tuple_to_stringinfo(ctx->out, tupdesc,
675 &change->data.tp.oldtuple->tuple,
676 true);
677 break;
678 default:
679 Assert(false);
682 MemoryContextSwitchTo(old);
683 MemoryContextReset(data->context);
685 OutputPluginWrite(ctx, true);
688 static void
689 pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
690 int nrelations, Relation relations[], ReorderBufferChange *change)
692 TestDecodingData *data;
693 TestDecodingTxnData *txndata;
694 MemoryContext old;
695 int i;
697 data = ctx->output_plugin_private;
698 txndata = txn->output_plugin_private;
700 /* output BEGIN if we haven't yet */
701 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
703 pg_output_begin(ctx, data, txn, false);
705 txndata->xact_wrote_changes = true;
707 /* Avoid leaking memory by using and resetting our own context */
708 old = MemoryContextSwitchTo(data->context);
710 OutputPluginPrepareWrite(ctx, true);
712 appendStringInfoString(ctx->out, "table ");
714 for (i = 0; i < nrelations; i++)
716 if (i > 0)
717 appendStringInfoString(ctx->out, ", ");
719 appendStringInfoString(ctx->out,
720 quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
721 NameStr(relations[i]->rd_rel->relname)));
724 appendStringInfoString(ctx->out, ": TRUNCATE:");
726 if (change->data.truncate.restart_seqs
727 || change->data.truncate.cascade)
729 if (change->data.truncate.restart_seqs)
730 appendStringInfoString(ctx->out, " restart_seqs");
731 if (change->data.truncate.cascade)
732 appendStringInfoString(ctx->out, " cascade");
734 else
735 appendStringInfoString(ctx->out, " (no-flags)");
737 MemoryContextSwitchTo(old);
738 MemoryContextReset(data->context);
740 OutputPluginWrite(ctx, true);
743 static void
744 pg_decode_message(LogicalDecodingContext *ctx,
745 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
746 const char *prefix, Size sz, const char *message)
748 OutputPluginPrepareWrite(ctx, true);
749 appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
750 transactional, prefix, sz);
751 appendBinaryStringInfo(ctx->out, message, sz);
752 OutputPluginWrite(ctx, true);
755 static void
756 pg_decode_stream_start(LogicalDecodingContext *ctx,
757 ReorderBufferTXN *txn)
759 TestDecodingData *data = ctx->output_plugin_private;
760 TestDecodingTxnData *txndata = txn->output_plugin_private;
763 * Allocate the txn plugin data for the first stream in the transaction.
765 if (txndata == NULL)
767 txndata =
768 MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
769 txndata->xact_wrote_changes = false;
770 txn->output_plugin_private = txndata;
773 txndata->stream_wrote_changes = false;
774 if (data->skip_empty_xacts)
775 return;
776 pg_output_stream_start(ctx, data, txn, true);
779 static void
780 pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
782 OutputPluginPrepareWrite(ctx, last_write);
783 if (data->include_xids)
784 appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
785 else
786 appendStringInfoString(ctx->out, "opening a streamed block for transaction");
787 OutputPluginWrite(ctx, last_write);
790 static void
791 pg_decode_stream_stop(LogicalDecodingContext *ctx,
792 ReorderBufferTXN *txn)
794 TestDecodingData *data = ctx->output_plugin_private;
795 TestDecodingTxnData *txndata = txn->output_plugin_private;
797 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
798 return;
800 OutputPluginPrepareWrite(ctx, true);
801 if (data->include_xids)
802 appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
803 else
804 appendStringInfoString(ctx->out, "closing a streamed block for transaction");
805 OutputPluginWrite(ctx, true);
808 static void
809 pg_decode_stream_abort(LogicalDecodingContext *ctx,
810 ReorderBufferTXN *txn,
811 XLogRecPtr abort_lsn)
813 TestDecodingData *data = ctx->output_plugin_private;
816 * stream abort can be sent for an individual subtransaction but we
817 * maintain the output_plugin_private only under the toptxn so if this is
818 * not the toptxn then fetch the toptxn.
820 ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
821 TestDecodingTxnData *txndata = toptxn->output_plugin_private;
822 bool xact_wrote_changes = txndata->xact_wrote_changes;
824 if (txn->toptxn == NULL)
826 Assert(txn->output_plugin_private != NULL);
827 pfree(txndata);
828 txn->output_plugin_private = NULL;
831 if (data->skip_empty_xacts && !xact_wrote_changes)
832 return;
834 OutputPluginPrepareWrite(ctx, true);
835 if (data->include_xids)
836 appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
837 else
838 appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
839 OutputPluginWrite(ctx, true);
842 static void
843 pg_decode_stream_prepare(LogicalDecodingContext *ctx,
844 ReorderBufferTXN *txn,
845 XLogRecPtr prepare_lsn)
847 TestDecodingData *data = ctx->output_plugin_private;
848 TestDecodingTxnData *txndata = txn->output_plugin_private;
850 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
851 return;
853 OutputPluginPrepareWrite(ctx, true);
855 if (data->include_xids)
856 appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
857 quote_literal_cstr(txn->gid), txn->xid);
858 else
859 appendStringInfo(ctx->out, "preparing streamed transaction %s",
860 quote_literal_cstr(txn->gid));
862 if (data->include_timestamp)
863 appendStringInfo(ctx->out, " (at %s)",
864 timestamptz_to_str(txn->xact_time.prepare_time));
866 OutputPluginWrite(ctx, true);
869 static void
870 pg_decode_stream_commit(LogicalDecodingContext *ctx,
871 ReorderBufferTXN *txn,
872 XLogRecPtr commit_lsn)
874 TestDecodingData *data = ctx->output_plugin_private;
875 TestDecodingTxnData *txndata = txn->output_plugin_private;
876 bool xact_wrote_changes = txndata->xact_wrote_changes;
878 pfree(txndata);
879 txn->output_plugin_private = NULL;
881 if (data->skip_empty_xacts && !xact_wrote_changes)
882 return;
884 OutputPluginPrepareWrite(ctx, true);
886 if (data->include_xids)
887 appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
888 else
889 appendStringInfoString(ctx->out, "committing streamed transaction");
891 if (data->include_timestamp)
892 appendStringInfo(ctx->out, " (at %s)",
893 timestamptz_to_str(txn->xact_time.commit_time));
895 OutputPluginWrite(ctx, true);
899 * In streaming mode, we don't display the changes as the transaction can abort
900 * at a later point in time. We don't want users to see the changes until the
901 * transaction is committed.
903 static void
904 pg_decode_stream_change(LogicalDecodingContext *ctx,
905 ReorderBufferTXN *txn,
906 Relation relation,
907 ReorderBufferChange *change)
909 TestDecodingData *data = ctx->output_plugin_private;
910 TestDecodingTxnData *txndata = txn->output_plugin_private;
912 /* output stream start if we haven't yet */
913 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
915 pg_output_stream_start(ctx, data, txn, false);
917 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
919 OutputPluginPrepareWrite(ctx, true);
920 if (data->include_xids)
921 appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
922 else
923 appendStringInfoString(ctx->out, "streaming change for transaction");
924 OutputPluginWrite(ctx, true);
928 * In streaming mode, we don't display the contents for transactional messages
929 * as the transaction can abort at a later point in time. We don't want users to
930 * see the message contents until the transaction is committed.
932 static void
933 pg_decode_stream_message(LogicalDecodingContext *ctx,
934 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
935 const char *prefix, Size sz, const char *message)
937 OutputPluginPrepareWrite(ctx, true);
939 if (transactional)
941 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
942 transactional, prefix, sz);
944 else
946 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
947 transactional, prefix, sz);
948 appendBinaryStringInfo(ctx->out, message, sz);
951 OutputPluginWrite(ctx, true);
955 * In streaming mode, we don't display the detailed information of Truncate.
956 * See pg_decode_stream_change.
958 static void
959 pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
960 int nrelations, Relation relations[],
961 ReorderBufferChange *change)
963 TestDecodingData *data = ctx->output_plugin_private;
964 TestDecodingTxnData *txndata = txn->output_plugin_private;
966 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
968 pg_output_stream_start(ctx, data, txn, false);
970 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
972 OutputPluginPrepareWrite(ctx, true);
973 if (data->include_xids)
974 appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
975 else
976 appendStringInfoString(ctx->out, "streaming truncate for transaction");
977 OutputPluginWrite(ctx, true);