diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 24cd4594c0..28f6acff3a 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -2195,6 +2195,50 @@ sub wait_for_slot_catchup =pod +=item $node->wait_for_subscription_sync(publisher, subname, dbname) + +Wait for all tables in pg_subscription_rel to complete the initial +synchronization (i.e to be either in 'syncdone' or 'ready' state). + +If the publisher node is given, additionally, check if the subscriber has +caught up to what has been committed on the primary. This is useful to +ensure that the initial data synchronization has been completed after +creating a new subscription. + +If there is no active replication connection from this peer, wait until +poll_query_until timeout. + +This is not a test. It die()s on failure. + +=cut + +sub wait_for_subscription_sync +{ + my ($self, $publisher, $subname, $dbname) = @_; + my $name = $self->name; + + $dbname = defined($dbname) ? $dbname : 'postgres'; + + # Wait for all tables to finish initial sync. + print "Waiting for all subscriptions in \"$name\" to synchronize data\n"; + my $query = + qq[SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');]; + $self->poll_query_until($dbname, $query) + or croak "timed out waiting for subscriber to synchronize data"; + + # Then, wait for the replication to catchup if required. + if (defined($publisher)) + { + croak 'subscription name must be specified' unless defined($subname); + $publisher->wait_for_catchup($subname); + } + + print "done\n"; + return; +} + +=pod + =item $node->wait_for_log(regexp, offset) Waits for the contents of the server log file, starting at the given offset, to diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index c3be32eaad..c60ef1c4f5 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -85,13 +85,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" ); -$node_publisher->wait_for_catchup('tap_sub'); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl index 293c0cff1e..315e46a8c7 100644 --- a/src/test/subscription/t/002_types.pl +++ b/src/test/subscription/t/002_types.pl @@ -111,13 +111,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)" ); -$node_publisher->wait_for_catchup('tap_sub'); - -# Wait for initial sync to finish as well -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); # Insert initial test data $node_publisher->safe_psql( diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index e111ab9181..1f0e7c7a1c 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -36,13 +36,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" ); -$node_publisher->wait_for_catchup('tap_sub'); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); @@ -68,8 +63,7 @@ $node_subscriber->poll_query_until('postgres', $started_query) $node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;"); # wait for sync to finish this time -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; # check that all data is synced $result = @@ -104,8 +98,7 @@ $node_subscriber->safe_psql('postgres', ); # and wait for data sync to finish again -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; # check that all data is synced $result = @@ -130,8 +123,7 @@ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); # wait for sync to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep_next"); diff --git a/src/test/subscription/t/005_encoding.pl b/src/test/subscription/t/005_encoding.pl index aec7a17a78..09e3a7a6c9 100644 --- a/src/test/subscription/t/005_encoding.pl +++ b/src/test/subscription/t/005_encoding.pl @@ -29,13 +29,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" ); -$node_publisher->wait_for_catchup('mysub'); - -# Wait for initial sync to finish as well -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub'); $node_publisher->safe_psql('postgres', q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8 diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl index c6cda10a19..cf0492567e 100644 --- a/src/test/subscription/t/006_rewrite.pl +++ b/src/test/subscription/t/006_rewrite.pl @@ -25,13 +25,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" ); -$node_publisher->wait_for_catchup('mysub'); - -# Wait for initial sync to finish as well -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub'); $node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');}); diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl index 963334ed89..c022d1956e 100644 --- a/src/test/subscription/t/008_diff_schema.pl +++ b/src/test/subscription/t/008_diff_schema.pl @@ -35,13 +35,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" ); -$node_publisher->wait_for_catchup('tap_sub'); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); my $result = $node_subscriber->safe_psql('postgres', @@ -102,8 +97,7 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)"); $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; # Add replica identity column. (The serial is not necessary, but it's # a convenient way to get a default on the new column so that rows diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl index 1f3719cd42..5657a2f45c 100644 --- a/src/test/subscription/t/010_truncate.pl +++ b/src/test/subscription/t/010_truncate.pl @@ -64,10 +64,7 @@ $node_subscriber->safe_psql('postgres', ); # Wait for initial sync of all subscriptions -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; # insert data to truncate @@ -180,8 +177,7 @@ $node_subscriber->safe_psql('postgres', ); # wait for initial data sync -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; # insert data to truncate diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl index f35d1cba4c..e4b3b41db7 100644 --- a/src/test/subscription/t/011_generated.pl +++ b/src/test/subscription/t/011_generated.pl @@ -37,10 +37,7 @@ $node_subscriber->safe_psql('postgres', ); # Wait for initial sync of all subscriptions -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1"); is( $result, qq(1|22 diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 4debbfe55f..deb1ee8e51 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -150,12 +150,8 @@ ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger; }); # Wait for initial sync of all subscriptions -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber1->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; -$node_subscriber2->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber1->wait_for_subscription_sync; +$node_subscriber2->wait_for_subscription_sync; # Tests for replication using leaf partition identity and schema @@ -480,10 +476,8 @@ $node_subscriber2->safe_psql('postgres', "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all"); # Wait for initial sync of all subscriptions -$node_subscriber1->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; -$node_subscriber2->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber1->wait_for_subscription_sync; +$node_subscriber2->wait_for_subscription_sync; # check that data is synced correctly $result = $node_subscriber1->safe_psql('postgres', @@ -554,8 +548,7 @@ $node_subscriber2->safe_psql('postgres', # make sure the subscription on the second subscriber is synced, before # continuing -$node_subscriber2->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->wait_for_subscription_sync; # Insert a change into the leaf partition, should be replicated through # the partition root (thanks to the FOR ALL TABLES partition). @@ -810,8 +803,7 @@ $node_subscriber2->safe_psql( $node_subscriber2->safe_psql('postgres', "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); -$node_subscriber2->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->wait_for_subscription_sync; # Make partition map cache $node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)"); diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index be37c35b19..a91935e9ee 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -142,12 +142,7 @@ $node_twoways->safe_psql( # We cannot rely solely on wait_for_catchup() here; it isn't sufficient # when tablesync workers might still be running. So in addition to that, # verify that tables are synced. -# XXX maybe this should be integrated in wait_for_catchup() itself. -$node_twoways->wait_for_catchup('testsub'); -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_twoways->poll_query_until('d2', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_twoways->wait_for_subscription_sync($node_twoways, 'testsub', 'd2'); is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"), $rows * 2, "2x$rows rows in t"); @@ -207,11 +202,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" ); -$node_publisher->wait_for_catchup('tap_sub'); - -# Also wait for initial table sync to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); is( $node_subscriber->safe_psql( 'postgres', "SELECT * FROM tab_replidentity_index"),