When creating materialized views, use REFRESH to load data.
[pgsql.git] / src / test / subscription / t / 022_twophase_cascade.pl
blob1acc79f17e5b07b75ae4960e3c3266094872ab04
2 # Copyright (c) 2021-2024, PostgreSQL Global Development Group
4 # Test cascading logical replication of 2PC.
6 # Includes tests for options 2PC (not-streaming) and also for 2PC (streaming).
8 # Two-phase and parallel apply will be tested in 023_twophase_stream, so we
9 # didn't add a parallel apply version for the tests in this file.
10 use strict;
11 use warnings FATAL => 'all';
12 use PostgreSQL::Test::Cluster;
13 use PostgreSQL::Test::Utils;
14 use Test::More;
16 ###############################
17 # Setup a cascade of pub/sub nodes.
18 # node_A -> node_B -> node_C
19 ###############################
21 # Initialize nodes
22 # node_A
23 my $node_A = PostgreSQL::Test::Cluster->new('node_A');
24 $node_A->init(allows_streaming => 'logical');
25 $node_A->append_conf(
26 'postgresql.conf', qq(
27 max_prepared_transactions = 10
28 logical_decoding_work_mem = 64kB
29 ));
30 $node_A->start;
31 # node_B
32 my $node_B = PostgreSQL::Test::Cluster->new('node_B');
33 $node_B->init(allows_streaming => 'logical');
34 $node_B->append_conf(
35 'postgresql.conf', qq(
36 max_prepared_transactions = 10
37 logical_decoding_work_mem = 64kB
38 ));
39 $node_B->start;
40 # node_C
41 my $node_C = PostgreSQL::Test::Cluster->new('node_C');
42 $node_C->init;
43 $node_C->append_conf(
44 'postgresql.conf', qq(
45 max_prepared_transactions = 10
46 logical_decoding_work_mem = 64kB
47 ));
48 $node_C->start;
50 # Create some pre-existing content on node_A
51 $node_A->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
52 $node_A->safe_psql(
53 'postgres', "
54 INSERT INTO tab_full SELECT generate_series(1,10);");
56 # Create the same tables on node_B and node_C
57 $node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
58 $node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
60 # Create some pre-existing content on node_A (for streaming tests)
61 $node_A->safe_psql('postgres',
62 "CREATE TABLE test_tab (a int primary key, b bytea)");
63 $node_A->safe_psql('postgres',
64 "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
66 # Create the same tables on node_B and node_C
67 # columns a and b are compatible with same table name on node_A
68 $node_B->safe_psql('postgres',
69 "CREATE TABLE test_tab (a int primary key, b bytea, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
71 $node_C->safe_psql('postgres',
72 "CREATE TABLE test_tab (a int primary key, b bytea, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
75 # Setup logical replication
77 # -----------------------
78 # 2PC NON-STREAMING TESTS
79 # -----------------------
81 # node_A (pub) -> node_B (sub)
82 my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
83 $node_A->safe_psql('postgres',
84 "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full, test_tab");
85 my $appname_B = 'tap_sub_B';
86 $node_B->safe_psql(
87 'postgres', "
88 CREATE SUBSCRIPTION tap_sub_B
89 CONNECTION '$node_A_connstr application_name=$appname_B'
90 PUBLICATION tap_pub_A
91 WITH (two_phase = on)");
93 # node_B (pub) -> node_C (sub)
94 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
95 $node_B->safe_psql('postgres',
96 "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full, test_tab");
97 my $appname_C = 'tap_sub_C';
98 $node_C->safe_psql(
99 'postgres', "
100 CREATE SUBSCRIPTION tap_sub_C
101 CONNECTION '$node_B_connstr application_name=$appname_C'
102 PUBLICATION tap_pub_B
103 WITH (two_phase = on)");
105 # Wait for subscribers to finish initialization
106 $node_A->wait_for_catchup($appname_B);
107 $node_B->wait_for_catchup($appname_C);
109 # Also wait for two-phase to be enabled
110 my $twophase_query =
111 "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
112 $node_B->poll_query_until('postgres', $twophase_query)
113 or die "Timed out while waiting for subscriber to enable twophase";
114 $node_C->poll_query_until('postgres', $twophase_query)
115 or die "Timed out while waiting for subscriber to enable twophase";
117 is(1, 1, "Cascade setup is complete");
119 my $result;
121 ###############################
122 # check that 2PC gets replicated to subscriber(s)
123 # then COMMIT PREPARED
124 ###############################
126 # 2PC PREPARE
127 $node_A->safe_psql(
128 'postgres', "
129 BEGIN;
130 INSERT INTO tab_full VALUES (11);
131 PREPARE TRANSACTION 'test_prepared_tab_full';");
133 $node_A->wait_for_catchup($appname_B);
134 $node_B->wait_for_catchup($appname_C);
136 # check the transaction state is prepared on subscriber(s)
137 $result =
138 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
139 is($result, qq(1), 'transaction is prepared on subscriber B');
140 $result =
141 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
142 is($result, qq(1), 'transaction is prepared on subscriber C');
144 # 2PC COMMIT
145 $node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';");
147 $node_A->wait_for_catchup($appname_B);
148 $node_B->wait_for_catchup($appname_C);
150 # check that transaction was committed on subscriber(s)
151 $result = $node_B->safe_psql('postgres',
152 "SELECT count(*) FROM tab_full where a = 11;");
153 is($result, qq(1), 'Row inserted via 2PC has committed on subscriber B');
154 $result = $node_C->safe_psql('postgres',
155 "SELECT count(*) FROM tab_full where a = 11;");
156 is($result, qq(1), 'Row inserted via 2PC has committed on subscriber C');
158 # check the transaction state is ended on subscriber(s)
159 $result =
160 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
161 is($result, qq(0), 'transaction is committed on subscriber B');
162 $result =
163 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
164 is($result, qq(0), 'transaction is committed on subscriber C');
166 ###############################
167 # check that 2PC gets replicated to subscriber(s)
168 # then ROLLBACK PREPARED
169 ###############################
171 # 2PC PREPARE
172 $node_A->safe_psql(
173 'postgres', "
174 BEGIN;
175 INSERT INTO tab_full VALUES (12);
176 PREPARE TRANSACTION 'test_prepared_tab_full';");
178 $node_A->wait_for_catchup($appname_B);
179 $node_B->wait_for_catchup($appname_C);
181 # check the transaction state is prepared on subscriber(s)
182 $result =
183 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
184 is($result, qq(1), 'transaction is prepared on subscriber B');
185 $result =
186 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
187 is($result, qq(1), 'transaction is prepared on subscriber C');
189 # 2PC ROLLBACK
190 $node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';");
192 $node_A->wait_for_catchup($appname_B);
193 $node_B->wait_for_catchup($appname_C);
195 # check that transaction is aborted on subscriber(s)
196 $result = $node_B->safe_psql('postgres',
197 "SELECT count(*) FROM tab_full where a = 12;");
198 is($result, qq(0), 'Row inserted via 2PC is not present on subscriber B');
199 $result = $node_C->safe_psql('postgres',
200 "SELECT count(*) FROM tab_full where a = 12;");
201 is($result, qq(0), 'Row inserted via 2PC is not present on subscriber C');
203 # check the transaction state is ended on subscriber(s)
204 $result =
205 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
206 is($result, qq(0), 'transaction is ended on subscriber B');
207 $result =
208 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
209 is($result, qq(0), 'transaction is ended on subscriber C');
211 ###############################
212 # Test nested transactions with 2PC
213 ###############################
215 # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
216 $node_A->safe_psql(
217 'postgres', "
218 BEGIN;
219 INSERT INTO tab_full VALUES (21);
220 SAVEPOINT sp_inner;
221 INSERT INTO tab_full VALUES (22);
222 ROLLBACK TO SAVEPOINT sp_inner;
223 PREPARE TRANSACTION 'outer';
226 $node_A->wait_for_catchup($appname_B);
227 $node_B->wait_for_catchup($appname_C);
229 # check the transaction state prepared on subscriber(s)
230 $result =
231 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
232 is($result, qq(1), 'transaction is prepared on subscriber B');
233 $result =
234 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
235 is($result, qq(1), 'transaction is prepared on subscriber C');
237 # 2PC COMMIT
238 $node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
240 $node_A->wait_for_catchup($appname_B);
241 $node_B->wait_for_catchup($appname_C);
243 # check the transaction state is ended on subscriber
244 $result =
245 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
246 is($result, qq(0), 'transaction is ended on subscriber B');
247 $result =
248 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
249 is($result, qq(0), 'transaction is ended on subscriber C');
251 # check inserts are visible at subscriber(s).
252 # 22 should be rolled back.
253 # 21 should be committed.
254 $result = $node_B->safe_psql('postgres',
255 "SELECT a FROM tab_full where a IN (21,22);");
256 is($result, qq(21), 'Rows committed are present on subscriber B');
257 $result = $node_C->safe_psql('postgres',
258 "SELECT a FROM tab_full where a IN (21,22);");
259 is($result, qq(21), 'Rows committed are present on subscriber C');
261 # ---------------------
262 # 2PC + STREAMING TESTS
263 # ---------------------
265 my $oldpid_B = $node_A->safe_psql(
266 'postgres', "
267 SELECT pid FROM pg_stat_replication
268 WHERE application_name = '$appname_B' AND state = 'streaming';");
269 my $oldpid_C = $node_B->safe_psql(
270 'postgres', "
271 SELECT pid FROM pg_stat_replication
272 WHERE application_name = '$appname_C' AND state = 'streaming';");
274 # Setup logical replication (streaming = on)
276 $node_B->safe_psql(
277 'postgres', "
278 ALTER SUBSCRIPTION tap_sub_B
279 SET (streaming = on);");
280 $node_C->safe_psql(
281 'postgres', "
282 ALTER SUBSCRIPTION tap_sub_C
283 SET (streaming = on)");
285 # Wait for subscribers to finish initialization
287 $node_A->poll_query_until(
288 'postgres', "
289 SELECT pid != $oldpid_B FROM pg_stat_replication
290 WHERE application_name = '$appname_B' AND state = 'streaming';"
291 ) or die "Timed out while waiting for apply to restart";
292 $node_B->poll_query_until(
293 'postgres', "
294 SELECT pid != $oldpid_C FROM pg_stat_replication
295 WHERE application_name = '$appname_C' AND state = 'streaming';"
296 ) or die "Timed out while waiting for apply to restart";
298 ###############################
299 # Test 2PC PREPARE / COMMIT PREPARED.
300 # 1. Data is streamed as a 2PC transaction.
301 # 2. Then do commit prepared.
303 # Expect all data is replicated on subscriber(s) after the commit.
304 ###############################
306 # Insert, update and delete enough rows to exceed the 64kB limit.
307 # Then 2PC PREPARE
308 $node_A->safe_psql(
309 'postgres', q{
310 BEGIN;
311 INSERT INTO test_tab SELECT i, sha256(i::text::bytea) FROM generate_series(3, 5000) s(i);
312 UPDATE test_tab SET b = sha256(b) WHERE mod(a,2) = 0;
313 DELETE FROM test_tab WHERE mod(a,3) = 0;
314 PREPARE TRANSACTION 'test_prepared_tab';});
316 $node_A->wait_for_catchup($appname_B);
317 $node_B->wait_for_catchup($appname_C);
319 # check the transaction state is prepared on subscriber(s)
320 $result =
321 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
322 is($result, qq(1), 'transaction is prepared on subscriber B');
323 $result =
324 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
325 is($result, qq(1), 'transaction is prepared on subscriber C');
327 # 2PC COMMIT
328 $node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
330 $node_A->wait_for_catchup($appname_B);
331 $node_B->wait_for_catchup($appname_C);
333 # check that transaction was committed on subscriber(s)
334 $result = $node_B->safe_psql('postgres',
335 "SELECT count(*), count(c), count(d = 999) FROM test_tab");
336 is($result, qq(3334|3334|3334),
337 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults'
339 $result = $node_C->safe_psql('postgres',
340 "SELECT count(*), count(c), count(d = 999) FROM test_tab");
341 is($result, qq(3334|3334|3334),
342 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults'
345 # check the transaction state is ended on subscriber(s)
346 $result =
347 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
348 is($result, qq(0), 'transaction is committed on subscriber B');
349 $result =
350 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
351 is($result, qq(0), 'transaction is committed on subscriber C');
353 ###############################
354 # Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT.
355 # 0. Cleanup from previous test leaving only 2 rows.
356 # 1. Insert one more row.
357 # 2. Record a SAVEPOINT.
358 # 3. Data is streamed using 2PC.
359 # 4. Do rollback to SAVEPOINT prior to the streamed inserts.
360 # 5. Then COMMIT PREPARED.
362 # Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1).
363 ###############################
365 # First, delete the data except for 2 rows (delete will be replicated)
366 $node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
368 # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
369 $node_A->safe_psql(
370 'postgres', "
371 BEGIN;
372 INSERT INTO test_tab VALUES (9999, 'foobar');
373 SAVEPOINT sp_inner;
374 INSERT INTO test_tab SELECT i, sha256(i::text::bytea) FROM generate_series(3, 5000) s(i);
375 UPDATE test_tab SET b = sha256(b) WHERE mod(a,2) = 0;
376 DELETE FROM test_tab WHERE mod(a,3) = 0;
377 ROLLBACK TO SAVEPOINT sp_inner;
378 PREPARE TRANSACTION 'outer';
381 $node_A->wait_for_catchup($appname_B);
382 $node_B->wait_for_catchup($appname_C);
384 # check the transaction state prepared on subscriber(s)
385 $result =
386 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
387 is($result, qq(1), 'transaction is prepared on subscriber B');
388 $result =
389 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
390 is($result, qq(1), 'transaction is prepared on subscriber C');
392 # 2PC COMMIT
393 $node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
395 $node_A->wait_for_catchup($appname_B);
396 $node_B->wait_for_catchup($appname_C);
398 # check the transaction state is ended on subscriber
399 $result =
400 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
401 is($result, qq(0), 'transaction is ended on subscriber B');
402 $result =
403 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
404 is($result, qq(0), 'transaction is ended on subscriber C');
406 # check inserts are visible at subscriber(s).
407 # All the streamed data (prior to the SAVEPOINT) should be rolled back.
408 # (9999, 'foobar') should be committed.
409 $result = $node_B->safe_psql('postgres',
410 "SELECT count(*) FROM test_tab where b = 'foobar';");
411 is($result, qq(1), 'Rows committed are present on subscriber B');
412 $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
413 is($result, qq(3), 'Rows committed are present on subscriber B');
414 $result = $node_C->safe_psql('postgres',
415 "SELECT count(*) FROM test_tab where b = 'foobar';");
416 is($result, qq(1), 'Rows committed are present on subscriber C');
417 $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
418 is($result, qq(3), 'Rows committed are present on subscriber C');
420 ###############################
421 # check all the cleanup
422 ###############################
424 # cleanup the node_B => node_C pub/sub
425 $node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C");
426 $result =
427 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
428 is($result, qq(0), 'check subscription was dropped on subscriber node C');
429 $result =
430 $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
431 is($result, qq(0),
432 'check subscription relation status was dropped on subscriber node C');
433 $result = $node_C->safe_psql('postgres',
434 "SELECT count(*) FROM pg_replication_origin");
435 is($result, qq(0),
436 'check replication origin was dropped on subscriber node C');
437 $result =
438 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
439 is($result, qq(0), 'check replication slot was dropped on publisher node B');
441 # cleanup the node_A => node_B pub/sub
442 $node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B");
443 $result =
444 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
445 is($result, qq(0), 'check subscription was dropped on subscriber node B');
446 $result =
447 $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
448 is($result, qq(0),
449 'check subscription relation status was dropped on subscriber node B');
450 $result = $node_B->safe_psql('postgres',
451 "SELECT count(*) FROM pg_replication_origin");
452 is($result, qq(0),
453 'check replication origin was dropped on subscriber node B');
454 $result =
455 $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
456 is($result, qq(0), 'check replication slot was dropped on publisher node A');
458 # shutdown
459 $node_C->stop('fast');
460 $node_B->stop('fast');
461 $node_A->stop('fast');
463 done_testing();