Update copyright for 2022
[pgsql.git] / src / test / subscription / t / 021_twophase.pl
blob88ef341825be29ddd29272419b7e8e15a03c4364
2 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
4 # logical replication of 2PC test
5 use strict;
6 use warnings;
7 use PostgreSQL::Test::Cluster;
8 use PostgreSQL::Test::Utils;
9 use Test::More tests => 24;
11 ###############################
12 # Setup
13 ###############################
15 # Initialize publisher node
16 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
17 $node_publisher->init(allows_streaming => 'logical');
18 $node_publisher->append_conf('postgresql.conf',
19 qq(max_prepared_transactions = 10));
20 $node_publisher->start;
22 # Create subscriber node
23 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
24 $node_subscriber->init(allows_streaming => 'logical');
25 $node_subscriber->append_conf('postgresql.conf',
26 qq(max_prepared_transactions = 10));
27 $node_subscriber->start;
29 # Create some pre-existing content on publisher
30 $node_publisher->safe_psql('postgres',
31 "CREATE TABLE tab_full (a int PRIMARY KEY)");
32 $node_publisher->safe_psql('postgres', "
33 BEGIN;
34 INSERT INTO tab_full SELECT generate_series(1,10);
35 PREPARE TRANSACTION 'some_initial_data';
36 COMMIT PREPARED 'some_initial_data';");
38 # Setup structure on subscriber
39 $node_subscriber->safe_psql('postgres',
40 "CREATE TABLE tab_full (a int PRIMARY KEY)");
42 # Setup logical replication
43 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
44 $node_publisher->safe_psql('postgres',
45 "CREATE PUBLICATION tap_pub FOR TABLE tab_full");
47 my $appname = 'tap_sub';
48 $node_subscriber->safe_psql('postgres', "
49 CREATE SUBSCRIPTION tap_sub
50 CONNECTION '$publisher_connstr application_name=$appname'
51 PUBLICATION tap_pub
52 WITH (two_phase = on)");
54 # Wait for subscriber to finish initialization
55 $node_publisher->wait_for_catchup($appname);
57 # Also wait for initial table sync to finish
58 my $synced_query =
59 "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
60 $node_subscriber->poll_query_until('postgres', $synced_query)
61 or die "Timed out while waiting for subscriber to synchronize data";
63 # Also wait for two-phase to be enabled
64 my $twophase_query =
65 "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
66 $node_subscriber->poll_query_until('postgres', $twophase_query)
67 or die "Timed out while waiting for subscriber to enable twophase";
69 ###############################
70 # check that 2PC gets replicated to subscriber
71 # then COMMIT PREPARED
72 ###############################
74 $node_publisher->safe_psql('postgres', "
75 BEGIN;
76 INSERT INTO tab_full VALUES (11);
77 PREPARE TRANSACTION 'test_prepared_tab_full';");
79 $node_publisher->wait_for_catchup($appname);
81 # check that transaction is in prepared state on subscriber
82 my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
83 is($result, qq(1), 'transaction is prepared on subscriber');
85 # check that 2PC gets committed on subscriber
86 $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';");
88 $node_publisher->wait_for_catchup($appname);
90 # check that transaction is committed on subscriber
91 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
92 is($result, qq(1), 'Row inserted via 2PC has committed on subscriber');
94 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
95 is($result, qq(0), 'transaction is committed on subscriber');
97 ###############################
98 # check that 2PC gets replicated to subscriber
99 # then ROLLBACK PREPARED
100 ###############################
102 $node_publisher->safe_psql('postgres',"
103 BEGIN;
104 INSERT INTO tab_full VALUES (12);
105 PREPARE TRANSACTION 'test_prepared_tab_full';");
107 $node_publisher->wait_for_catchup($appname);
109 # check that transaction is in prepared state on subscriber
110 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
111 is($result, qq(1), 'transaction is prepared on subscriber');
113 # check that 2PC gets aborted on subscriber
114 $node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';");
116 $node_publisher->wait_for_catchup($appname);
118 # check that transaction is aborted on subscriber
119 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
120 is($result, qq(0), 'Row inserted via 2PC is not present on subscriber');
122 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
123 is($result, qq(0), 'transaction is aborted on subscriber');
125 ###############################
126 # Check that ROLLBACK PREPARED is decoded properly on crash restart
127 # (publisher and subscriber crash)
128 ###############################
130 $node_publisher->safe_psql('postgres', "
131 BEGIN;
132 INSERT INTO tab_full VALUES (12);
133 INSERT INTO tab_full VALUES (13);
134 PREPARE TRANSACTION 'test_prepared_tab';");
136 $node_subscriber->stop('immediate');
137 $node_publisher->stop('immediate');
139 $node_publisher->start;
140 $node_subscriber->start;
142 # rollback post the restart
143 $node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
144 $node_publisher->wait_for_catchup($appname);
146 # check inserts are rolled back
147 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (12,13);");
148 is($result, qq(0), 'Rows rolled back are not on the subscriber');
150 ###############################
151 # Check that COMMIT PREPARED is decoded properly on crash restart
152 # (publisher and subscriber crash)
153 ###############################
155 $node_publisher->safe_psql('postgres', "
156 BEGIN;
157 INSERT INTO tab_full VALUES (12);
158 INSERT INTO tab_full VALUES (13);
159 PREPARE TRANSACTION 'test_prepared_tab';");
161 $node_subscriber->stop('immediate');
162 $node_publisher->stop('immediate');
164 $node_publisher->start;
165 $node_subscriber->start;
167 # commit post the restart
168 $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
169 $node_publisher->wait_for_catchup($appname);
171 # check inserts are visible
172 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (12,13);");
173 is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
175 ###############################
176 # Check that COMMIT PREPARED is decoded properly on crash restart
177 # (subscriber only crash)
178 ###############################
180 $node_publisher->safe_psql('postgres', "
181 BEGIN;
182 INSERT INTO tab_full VALUES (14);
183 INSERT INTO tab_full VALUES (15);
184 PREPARE TRANSACTION 'test_prepared_tab';");
186 $node_subscriber->stop('immediate');
187 $node_subscriber->start;
189 # commit post the restart
190 $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
191 $node_publisher->wait_for_catchup($appname);
193 # check inserts are visible
194 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (14,15);");
195 is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
197 ###############################
198 # Check that COMMIT PREPARED is decoded properly on crash restart
199 # (publisher only crash)
200 ###############################
202 $node_publisher->safe_psql('postgres', "
203 BEGIN;
204 INSERT INTO tab_full VALUES (16);
205 INSERT INTO tab_full VALUES (17);
206 PREPARE TRANSACTION 'test_prepared_tab';");
208 $node_publisher->stop('immediate');
209 $node_publisher->start;
211 # commit post the restart
212 $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
213 $node_publisher->wait_for_catchup($appname);
215 # check inserts are visible
216 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (16,17);");
217 is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
219 ###############################
220 # Test nested transaction with 2PC
221 ###############################
223 # check that 2PC gets replicated to subscriber
224 $node_publisher->safe_psql('postgres', "
225 BEGIN;
226 INSERT INTO tab_full VALUES (21);
227 SAVEPOINT sp_inner;
228 INSERT INTO tab_full VALUES (22);
229 ROLLBACK TO SAVEPOINT sp_inner;
230 PREPARE TRANSACTION 'outer';
232 $node_publisher->wait_for_catchup($appname);
234 # check that transaction is in prepared state on subscriber
235 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
236 is($result, qq(1), 'transaction is prepared on subscriber');
238 # COMMIT
239 $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'outer';");
241 $node_publisher->wait_for_catchup($appname);
243 # check the transaction state on subscriber
244 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
245 is($result, qq(0), 'transaction is ended on subscriber');
247 # check inserts are visible. 22 should be rolled back. 21 should be committed.
248 $result = $node_subscriber->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
249 is($result, qq(21), 'Rows committed are on the subscriber');
251 ###############################
252 # Test using empty GID
253 ###############################
255 # check that 2PC gets replicated to subscriber
256 $node_publisher->safe_psql('postgres', "
257 BEGIN;
258 INSERT INTO tab_full VALUES (51);
259 PREPARE TRANSACTION '';");
260 $node_publisher->wait_for_catchup($appname);
262 # check that transaction is in prepared state on subscriber
263 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
264 is($result, qq(1), 'transaction is prepared on subscriber');
266 # ROLLBACK
267 $node_publisher->safe_psql('postgres', "ROLLBACK PREPARED '';");
269 # check that 2PC gets aborted on subscriber
270 $node_publisher->wait_for_catchup($appname);
272 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
273 is($result, qq(0), 'transaction is aborted on subscriber');
275 ###############################
276 # copy_data=false and two_phase
277 ###############################
279 #create some test tables for copy tests
280 $node_publisher->safe_psql('postgres', "CREATE TABLE tab_copy (a int PRIMARY KEY)");
281 $node_publisher->safe_psql('postgres', "INSERT INTO tab_copy SELECT generate_series(1,5);");
282 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_copy (a int PRIMARY KEY)");
283 $node_subscriber->safe_psql('postgres', "INSERT INTO tab_copy VALUES (88);");
284 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
285 is($result, qq(1), 'initial data in subscriber table');
287 # Setup logical replication
288 $node_publisher->safe_psql('postgres',
289 "CREATE PUBLICATION tap_pub_copy FOR TABLE tab_copy;");
291 my $appname_copy = 'appname_copy';
292 $node_subscriber->safe_psql('postgres', "
293 CREATE SUBSCRIPTION tap_sub_copy
294 CONNECTION '$publisher_connstr application_name=$appname_copy'
295 PUBLICATION tap_pub_copy
296 WITH (two_phase=on, copy_data=false);");
298 # Wait for subscriber to finish initialization
299 $node_publisher->wait_for_catchup($appname_copy);
301 # Also wait for initial table sync to finish
302 $node_subscriber->poll_query_until('postgres', $synced_query)
303 or die "Timed out while waiting for subscriber to synchronize data";
305 # Also wait for two-phase to be enabled
306 $node_subscriber->poll_query_until('postgres', $twophase_query)
307 or die "Timed out while waiting for subscriber to enable twophase";
309 # Check that the initial table data was NOT replicated (because we said copy_data=false)
310 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
311 is($result, qq(1), 'initial data in subscriber table');
313 # Now do a prepare on publisher and check that it IS replicated
314 $node_publisher->safe_psql('postgres', "
315 BEGIN;
316 INSERT INTO tab_copy VALUES (99);
317 PREPARE TRANSACTION 'mygid';");
319 # Wait for both subscribers to catchup
320 $node_publisher->wait_for_catchup($appname_copy);
321 $node_publisher->wait_for_catchup($appname);
323 # Check that the transaction has been prepared on the subscriber, there will be 2
324 # prepared transactions for the 2 subscriptions.
325 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
326 is($result, qq(2), 'transaction is prepared on subscriber');
328 # Now commit the insert and verify that it IS replicated
329 $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'mygid';");
331 $result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
332 is($result, qq(6), 'publisher inserted data');
334 $node_publisher->wait_for_catchup($appname_copy);
336 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
337 is($result, qq(2), 'replicated data in subscriber table');
339 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;");
340 $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
342 ###############################
343 # check all the cleanup
344 ###############################
346 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
348 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
349 is($result, qq(0), 'check subscription was dropped on subscriber');
351 $result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
352 is($result, qq(0), 'check replication slot was dropped on publisher');
354 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
355 is($result, qq(0), 'check subscription relation status was dropped on subscriber');
357 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
358 is($result, qq(0), 'check replication origin was dropped on subscriber');
360 $node_subscriber->stop('fast');
361 $node_publisher->stop('fast');