diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 901bff9974..283afa5d9d 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -418,7 +418,7 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) static void logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) { - LogicalRepRelMapEntry *entry; + LogicalRepPartMapEntry *entry; /* Just to be sure. */ if (LogicalRepPartMap == NULL) @@ -431,11 +431,11 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); /* TODO, use inverse lookup hashtable? */ - while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) { - if (entry->localreloid == reloid) + if (entry->relmapentry.localreloid == reloid) { - entry->localrelvalid = false; + entry->relmapentry.localrelvalid = false; hash_seq_term(&status); break; } @@ -448,8 +448,8 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); - while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) - entry->localrelvalid = false; + while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) + entry->relmapentry.localrelvalid = false; } } @@ -502,7 +502,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, Oid partOid = RelationGetRelid(partrel); AttrMap *attrmap = root->attrmap; bool found; - int i; MemoryContext oldctx; if (LogicalRepPartMap == NULL) @@ -513,31 +512,40 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, (void *) &partOid, HASH_ENTER, &found); - if (found) - return &part_entry->relmapentry; + entry = &part_entry->relmapentry; - memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + if (found && entry->localrelvalid) + return entry; /* Switch to longer-lived context. */ oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); - part_entry->partoid = partOid; - - /* Remote relation is copied as-is from the root entry. */ - entry = &part_entry->relmapentry; - entry->remoterel.remoteid = remoterel->remoteid; - entry->remoterel.nspname = pstrdup(remoterel->nspname); - entry->remoterel.relname = pstrdup(remoterel->relname); - entry->remoterel.natts = remoterel->natts; - entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); - entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); - for (i = 0; i < remoterel->natts; i++) + if (!found) { - entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); - entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + part_entry->partoid = partOid; + } + + if (!entry->remoterel.remoteid) + { + int i; + + /* Remote relation is copied as-is from the root entry. */ + entry = &part_entry->relmapentry; + entry->remoterel.remoteid = remoterel->remoteid; + entry->remoterel.nspname = pstrdup(remoterel->nspname); + entry->remoterel.relname = pstrdup(remoterel->relname); + entry->remoterel.natts = remoterel->natts; + entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); + entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); + for (i = 0; i < remoterel->natts; i++) + { + entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); + entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + } + entry->remoterel.replident = remoterel->replident; + entry->remoterel.attkeys = bms_copy(remoterel->attkeys); } - entry->remoterel.replident = remoterel->replident; - entry->remoterel.attkeys = bms_copy(remoterel->attkeys); entry->localrel = partrel; entry->localreloid = partOid; @@ -562,7 +570,11 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, { AttrNumber root_attno = map->attnums[attno]; - entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; + /* 0 means it's a dropped attribute. See comments atop AttrMap. */ + if (root_attno == 0) + entry->attrmap->attnums[attno] = -1; + else + entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; } } else diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 7689cbb364..4eef52df77 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 67; +use Test::More tests => 69; # setup @@ -786,7 +786,55 @@ ok( $logfile =~ qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, 'delete target row is missing in tab2_1'); -# No need for this until more tests are added. -# $node_subscriber1->append_conf('postgresql.conf', -# "log_min_messages = warning"); -# $node_subscriber1->reload; +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber1->reload; + +# Test that replication continues to work correctly after altering the +# partition of a partitioned target table. + +$node_publisher->safe_psql( + 'postgres', q{ + CREATE TABLE tab5 (a int NOT NULL, b int); + CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a); + ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;}); + +$node_subscriber2->safe_psql( + 'postgres', q{ + CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a); + CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT; + CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a); + ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx; + ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;}); + +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); + +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Make partition map cache +$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)"); +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b FROM tab5 ORDER BY 1"); +is($result, qq(2|1), 'updates of tab5 replicated correctly'); + +# Change the column order of partition on subscriber +$node_subscriber2->safe_psql( + 'postgres', q{ + ALTER TABLE tab5 DETACH PARTITION tab5_1; + ALTER TABLE tab5_1 DROP COLUMN b; + ALTER TABLE tab5_1 ADD COLUMN b int; + ALTER TABLE tab5 ATTACH PARTITION tab5_1 DEFAULT}); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5 ORDER BY 1"); +is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber');