From cd4b2334db4980bbf86a8ba1d446db17e62ca342 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Fri, 6 Jan 2023 11:11:51 -0500 Subject: [PATCH] Invalidate pgoutput's replication-decisions cache upon schema rename. A schema rename should cause reporting the new qualified names of tables to logical replication subscribers, but that wasn't happening. Flush the RelationSyncCache to make it happen. (If you ask me, the new test case shows that the behavior in this area is still pretty dubious, but apparently it's operating as designed.) Vignesh C Discussion: https://postgr.es/m/CALDaNm32vLRv5KdrDFeVC-CU+4Wg1daA55hMqOxDGJBzvd76-w@mail.gmail.com --- src/backend/replication/pgoutput/pgoutput.c | 25 ++++++-- src/test/subscription/t/100_bugs.pl | 65 ++++++++++++++++++++- 2 files changed, 84 insertions(+), 6 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 7737242516..876adab38e 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1929,7 +1929,22 @@ init_rel_sync_cache(MemoryContext cachectx) Assert(RelationSyncCache != NULL); + /* We must update the cache entry for a relation after a relcache flush */ CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0); + + /* + * Flush all cache entries after a pg_namespace change, in case it was a + * schema rename affecting a relation being replicated. + */ + CacheRegisterSyscacheCallback(NAMESPACEOID, + rel_sync_cache_publication_cb, + (Datum) 0); + + /* + * Flush all cache entries after any publication changes. (We need no + * callback entry for pg_publication, because publication_invalidation_cb + * will take care of it.) + */ CacheRegisterSyscacheCallback(PUBLICATIONRELMAP, rel_sync_cache_publication_cb, (Datum) 0); @@ -2325,8 +2340,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) /* * Publication relation/schema map syscache invalidation callback * - * Called for invalidations on pg_publication, pg_publication_rel, and - * pg_publication_namespace. + * Called for invalidations on pg_publication, pg_publication_rel, + * pg_publication_namespace, and pg_namespace. */ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) @@ -2337,14 +2352,14 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) /* * We can get here if the plugin was used in SQL interface as the * RelSchemaSyncCache is destroyed when the decoding finishes, but there - * is no way to unregister the relcache invalidation callback. + * is no way to unregister the invalidation callbacks. */ if (RelationSyncCache == NULL) return; /* - * There is no way to find which entry in our cache the hash belongs to so - * mark the whole cache as invalid. + * We have no easy way to identify which cache entries this invalidation + * event might have affected, so just mark them all invalid. */ hash_seq_init(&status, RelationSyncCache); while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index 143caac792..036576acab 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -70,9 +70,10 @@ $node_publisher->wait_for_catchup('sub1'); pass('index predicates do not cause crash'); # We'll re-use these nodes below, so drop their replication state. -# We don't bother to drop the tables though. $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1"); $node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1"); +# Drop the tables too. +$node_publisher->safe_psql('postgres', "DROP TABLE tab1"); $node_publisher->stop('fast'); $node_subscriber->stop('fast'); @@ -307,6 +308,68 @@ is( $node_subscriber->safe_psql( qq(-1|1), "update works with REPLICA IDENTITY"); +# Clean up +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', "DROP TABLE tab_replidentity_index"); +$node_subscriber->safe_psql('postgres', "DROP TABLE tab_replidentity_index"); + +# Test schema invalidation by renaming the schema + +# Create tables on publisher +$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1"); +$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)"); + +# Create tables on subscriber +$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)"); +$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch2"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)"); + +# Setup logical replication that will cover t1 under both schema names +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher_connstr' PUBLICATION tap_pub_sch" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch'); + +# Check what happens to data inserted before and after schema rename +$node_publisher->safe_psql( + 'postgres', + "begin; +insert into sch1.t1 values(1); +alter schema sch1 rename to sch2; +create schema sch1; +create table sch1.t1(c1 int); +insert into sch1.t1 values(2); +insert into sch2.t1 values(3); +commit;"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch'); + +# Subscriber's sch1.t1 should receive the row inserted into the new sch1.t1, +# but not the row inserted into the old sch1.t1 post-rename. +my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is( $result, qq(1 +2), 'check data in subscriber sch1.t1 after schema rename'); + +# Subscriber's sch2.t1 won't have gotten anything yet ... +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1"); +is($result, '', 'no data yet in subscriber sch2.t1 after schema rename'); + +# ... but it should show up after REFRESH. +$node_subscriber->safe_psql('postgres', + 'ALTER SUBSCRIPTION tap_sub_sch REFRESH PUBLICATION'); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1"); +is( $result, qq(1 +3), 'check data in subscriber sch2.t1 after schema rename'); + $node_publisher->stop('fast'); $node_subscriber->stop('fast');