Update copyright for 2022
[pgsql.git] / src / test / subscription / t / 022_twophase_cascade.pl
blob80680f06a0ee8a2194cb72e974b0392e0f9d9219
2 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
4 # Test cascading logical replication of 2PC.
6 # Includes tests for options 2PC (not-streaming) and also for 2PC (streaming).
8 use strict;
9 use warnings;
10 use PostgreSQL::Test::Cluster;
11 use PostgreSQL::Test::Utils;
12 use Test::More tests => 41;
14 ###############################
15 # Setup a cascade of pub/sub nodes.
16 # node_A -> node_B -> node_C
17 ###############################
19 # Initialize nodes
20 # node_A
21 my $node_A = PostgreSQL::Test::Cluster->new('node_A');
22 $node_A->init(allows_streaming => 'logical');
23 $node_A->append_conf('postgresql.conf', qq(
24 max_prepared_transactions = 10
25 logical_decoding_work_mem = 64kB
26 ));
27 $node_A->start;
28 # node_B
29 my $node_B = PostgreSQL::Test::Cluster->new('node_B');
30 $node_B->init(allows_streaming => 'logical');
31 $node_B->append_conf('postgresql.conf', qq(
32 max_prepared_transactions = 10
33 logical_decoding_work_mem = 64kB
34 ));
35 $node_B->start;
36 # node_C
37 my $node_C = PostgreSQL::Test::Cluster->new('node_C');
38 $node_C->init(allows_streaming => 'logical');
39 $node_C->append_conf('postgresql.conf', qq(
40 max_prepared_transactions = 10
41 logical_decoding_work_mem = 64kB
42 ));
43 $node_C->start;
45 # Create some pre-existing content on node_A
46 $node_A->safe_psql('postgres',
47 "CREATE TABLE tab_full (a int PRIMARY KEY)");
48 $node_A->safe_psql('postgres', "
49 INSERT INTO tab_full SELECT generate_series(1,10);");
51 # Create the same tables on node_B and node_C
52 $node_B->safe_psql('postgres',
53 "CREATE TABLE tab_full (a int PRIMARY KEY)");
54 $node_C->safe_psql('postgres',
55 "CREATE TABLE tab_full (a int PRIMARY KEY)");
57 # Create some pre-existing content on node_A (for streaming tests)
58 $node_A->safe_psql('postgres',
59 "CREATE TABLE test_tab (a int primary key, b varchar)");
60 $node_A->safe_psql('postgres',
61 "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
63 # Create the same tables on node_B and node_C
64 # columns a and b are compatible with same table name on node_A
65 $node_B->safe_psql('postgres',
66 "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
67 $node_C->safe_psql('postgres',
68 "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
70 # Setup logical replication
72 # -----------------------
73 # 2PC NON-STREAMING TESTS
74 # -----------------------
76 # node_A (pub) -> node_B (sub)
77 my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
78 $node_A->safe_psql('postgres',
79 "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full, test_tab");
80 my $appname_B = 'tap_sub_B';
81 $node_B->safe_psql('postgres', "
82 CREATE SUBSCRIPTION tap_sub_B
83 CONNECTION '$node_A_connstr application_name=$appname_B'
84 PUBLICATION tap_pub_A
85 WITH (two_phase = on)");
87 # node_B (pub) -> node_C (sub)
88 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
89 $node_B->safe_psql('postgres',
90 "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full, test_tab");
91 my $appname_C = 'tap_sub_C';
92 $node_C->safe_psql('postgres', "
93 CREATE SUBSCRIPTION tap_sub_C
94 CONNECTION '$node_B_connstr application_name=$appname_C'
95 PUBLICATION tap_pub_B
96 WITH (two_phase = on)");
98 # Wait for subscribers to finish initialization
99 $node_A->wait_for_catchup($appname_B);
100 $node_B->wait_for_catchup($appname_C);
102 # Also wait for two-phase to be enabled
103 my $twophase_query = "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
104 $node_B->poll_query_until('postgres', $twophase_query)
105 or die "Timed out while waiting for subscriber to enable twophase";
106 $node_C->poll_query_until('postgres', $twophase_query)
107 or die "Timed out while waiting for subscriber to enable twophase";
109 is(1,1, "Cascade setup is complete");
111 my $result;
113 ###############################
114 # check that 2PC gets replicated to subscriber(s)
115 # then COMMIT PREPARED
116 ###############################
118 # 2PC PREPARE
119 $node_A->safe_psql('postgres', "
120 BEGIN;
121 INSERT INTO tab_full VALUES (11);
122 PREPARE TRANSACTION 'test_prepared_tab_full';");
124 $node_A->wait_for_catchup($appname_B);
125 $node_B->wait_for_catchup($appname_C);
127 # check the transaction state is prepared on subscriber(s)
128 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
129 is($result, qq(1), 'transaction is prepared on subscriber B');
130 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
131 is($result, qq(1), 'transaction is prepared on subscriber C');
133 # 2PC COMMIT
134 $node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';");
136 $node_A->wait_for_catchup($appname_B);
137 $node_B->wait_for_catchup($appname_C);
139 # check that transaction was committed on subscriber(s)
140 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
141 is($result, qq(1), 'Row inserted via 2PC has committed on subscriber B');
142 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
143 is($result, qq(1), 'Row inserted via 2PC has committed on subscriber C');
145 # check the transaction state is ended on subscriber(s)
146 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
147 is($result, qq(0), 'transaction is committed on subscriber B');
148 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
149 is($result, qq(0), 'transaction is committed on subscriber C');
151 ###############################
152 # check that 2PC gets replicated to subscriber(s)
153 # then ROLLBACK PREPARED
154 ###############################
156 # 2PC PREPARE
157 $node_A->safe_psql('postgres', "
158 BEGIN;
159 INSERT INTO tab_full VALUES (12);
160 PREPARE TRANSACTION 'test_prepared_tab_full';");
162 $node_A->wait_for_catchup($appname_B);
163 $node_B->wait_for_catchup($appname_C);
165 # check the transaction state is prepared on subscriber(s)
166 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
167 is($result, qq(1), 'transaction is prepared on subscriber B');
168 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
169 is($result, qq(1), 'transaction is prepared on subscriber C');
171 # 2PC ROLLBACK
172 $node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';");
174 $node_A->wait_for_catchup($appname_B);
175 $node_B->wait_for_catchup($appname_C);
177 # check that transaction is aborted on subscriber(s)
178 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
179 is($result, qq(0), 'Row inserted via 2PC is not present on subscriber B');
180 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
181 is($result, qq(0), 'Row inserted via 2PC is not present on subscriber C');
183 # check the transaction state is ended on subscriber(s)
184 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
185 is($result, qq(0), 'transaction is ended on subscriber B');
186 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
187 is($result, qq(0), 'transaction is ended on subscriber C');
189 ###############################
190 # Test nested transactions with 2PC
191 ###############################
193 # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
194 $node_A->safe_psql('postgres', "
195 BEGIN;
196 INSERT INTO tab_full VALUES (21);
197 SAVEPOINT sp_inner;
198 INSERT INTO tab_full VALUES (22);
199 ROLLBACK TO SAVEPOINT sp_inner;
200 PREPARE TRANSACTION 'outer';
203 $node_A->wait_for_catchup($appname_B);
204 $node_B->wait_for_catchup($appname_C);
206 # check the transaction state prepared on subscriber(s)
207 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
208 is($result, qq(1), 'transaction is prepared on subscriber B');
209 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
210 is($result, qq(1), 'transaction is prepared on subscriber C');
212 # 2PC COMMIT
213 $node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
215 $node_A->wait_for_catchup($appname_B);
216 $node_B->wait_for_catchup($appname_C);
218 # check the transaction state is ended on subscriber
219 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
220 is($result, qq(0), 'transaction is ended on subscriber B');
221 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
222 is($result, qq(0), 'transaction is ended on subscriber C');
224 # check inserts are visible at subscriber(s).
225 # 22 should be rolled back.
226 # 21 should be committed.
227 $result = $node_B->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
228 is($result, qq(21), 'Rows committed are present on subscriber B');
229 $result = $node_C->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
230 is($result, qq(21), 'Rows committed are present on subscriber C');
232 # ---------------------
233 # 2PC + STREAMING TESTS
234 # ---------------------
236 my $oldpid_B = $node_A->safe_psql('postgres', "
237 SELECT pid FROM pg_stat_replication
238 WHERE application_name = '$appname_B' AND state = 'streaming';");
239 my $oldpid_C = $node_B->safe_psql('postgres', "
240 SELECT pid FROM pg_stat_replication
241 WHERE application_name = '$appname_C' AND state = 'streaming';");
243 # Setup logical replication (streaming = on)
245 $node_B->safe_psql('postgres', "
246 ALTER SUBSCRIPTION tap_sub_B
247 SET (streaming = on);");
248 $node_C->safe_psql('postgres', "
249 ALTER SUBSCRIPTION tap_sub_C
250 SET (streaming = on)");
252 # Wait for subscribers to finish initialization
254 $node_A->poll_query_until('postgres', "
255 SELECT pid != $oldpid_B FROM pg_stat_replication
256 WHERE application_name = '$appname_B' AND state = 'streaming';"
257 ) or die "Timed out while waiting for apply to restart";
258 $node_B->poll_query_until('postgres', "
259 SELECT pid != $oldpid_C FROM pg_stat_replication
260 WHERE application_name = '$appname_C' AND state = 'streaming';"
261 ) or die "Timed out while waiting for apply to restart";
263 ###############################
264 # Test 2PC PREPARE / COMMIT PREPARED.
265 # 1. Data is streamed as a 2PC transaction.
266 # 2. Then do commit prepared.
268 # Expect all data is replicated on subscriber(s) after the commit.
269 ###############################
271 # Insert, update and delete enough rows to exceed the 64kB limit.
272 # Then 2PC PREPARE
273 $node_A->safe_psql('postgres', q{
274 BEGIN;
275 INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
276 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
277 DELETE FROM test_tab WHERE mod(a,3) = 0;
278 PREPARE TRANSACTION 'test_prepared_tab';});
280 $node_A->wait_for_catchup($appname_B);
281 $node_B->wait_for_catchup($appname_C);
283 # check the transaction state is prepared on subscriber(s)
284 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
285 is($result, qq(1), 'transaction is prepared on subscriber B');
286 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
287 is($result, qq(1), 'transaction is prepared on subscriber C');
289 # 2PC COMMIT
290 $node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
292 $node_A->wait_for_catchup($appname_B);
293 $node_B->wait_for_catchup($appname_C);
295 # check that transaction was committed on subscriber(s)
296 $result = $node_B->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
297 is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults');
298 $result = $node_C->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
299 is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults');
301 # check the transaction state is ended on subscriber(s)
302 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
303 is($result, qq(0), 'transaction is committed on subscriber B');
304 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
305 is($result, qq(0), 'transaction is committed on subscriber C');
307 ###############################
308 # Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT.
309 # 0. Cleanup from previous test leaving only 2 rows.
310 # 1. Insert one more row.
311 # 2. Record a SAVEPOINT.
312 # 3. Data is streamed using 2PC.
313 # 4. Do rollback to SAVEPOINT prior to the streamed inserts.
314 # 5. Then COMMIT PREPARED.
316 # Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1).
317 ###############################
319 # First, delete the data except for 2 rows (delete will be replicated)
320 $node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
322 # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
323 $node_A->safe_psql('postgres', "
324 BEGIN;
325 INSERT INTO test_tab VALUES (9999, 'foobar');
326 SAVEPOINT sp_inner;
327 INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
328 UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
329 DELETE FROM test_tab WHERE mod(a,3) = 0;
330 ROLLBACK TO SAVEPOINT sp_inner;
331 PREPARE TRANSACTION 'outer';
334 $node_A->wait_for_catchup($appname_B);
335 $node_B->wait_for_catchup($appname_C);
337 # check the transaction state prepared on subscriber(s)
338 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
339 is($result, qq(1), 'transaction is prepared on subscriber B');
340 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
341 is($result, qq(1), 'transaction is prepared on subscriber C');
343 # 2PC COMMIT
344 $node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
346 $node_A->wait_for_catchup($appname_B);
347 $node_B->wait_for_catchup($appname_C);
349 # check the transaction state is ended on subscriber
350 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
351 is($result, qq(0), 'transaction is ended on subscriber B');
352 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
353 is($result, qq(0), 'transaction is ended on subscriber C');
355 # check inserts are visible at subscriber(s).
356 # All the streamed data (prior to the SAVEPOINT) should be rolled back.
357 # (9999, 'foobar') should be committed.
358 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';");
359 is($result, qq(1), 'Rows committed are present on subscriber B');
360 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
361 is($result, qq(3), 'Rows committed are present on subscriber B');
362 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';");
363 is($result, qq(1), 'Rows committed are present on subscriber C');
364 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
365 is($result, qq(3), 'Rows committed are present on subscriber C');
367 ###############################
368 # check all the cleanup
369 ###############################
371 # cleanup the node_B => node_C pub/sub
372 $node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C");
373 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
374 is($result, qq(0), 'check subscription was dropped on subscriber node C');
375 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
376 is($result, qq(0), 'check subscription relation status was dropped on subscriber node C');
377 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
378 is($result, qq(0), 'check replication origin was dropped on subscriber node C');
379 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
380 is($result, qq(0), 'check replication slot was dropped on publisher node B');
382 # cleanup the node_A => node_B pub/sub
383 $node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B");
384 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
385 is($result, qq(0), 'check subscription was dropped on subscriber node B');
386 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
387 is($result, qq(0), 'check subscription relation status was dropped on subscriber node B');
388 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
389 is($result, qq(0), 'check replication origin was dropped on subscriber node B');
390 $result = $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
391 is($result, qq(0), 'check replication slot was dropped on publisher node A');
393 # shutdown
394 $node_C->stop('fast');
395 $node_B->stop('fast');
396 $node_A->stop('fast');