Fix row filters with multiple publications

When publishing changes through a artition root, we should use the row
filter for the top-most ancestor. The relation may be added to multiple
publications, using different ancestors, and 52e4f0cd47 handled this
incorrectly. With c91f71b9dc we find the correct top-most ancestor, but
the code tried to fetch the row filter from all publications, including
those using a different ancestor etc. No row filter can be found for
such publications, which was treated as replicating all rows.

Similarly to c91f71b9dc, this seems to be a rare issue in practice. It
requires multiple publications including the same partitioned relation,
through different ancestors.

Fixed by only passing publications containing the top-most ancestor to
pgoutput_row_filter_init(), so that treating a missing row filter as
replicating all rows is correct.

Report and fix by me, test case by Hou zj. Reviews and improvements by
Amit Kapila.

Author: Tomas Vondra, Hou zj, Amit Kapila
Reviewed-by: Amit Kapila, Hou zj
Discussion: https://postgr.es/m/d26d24dd-2fab-3c48-0162-2b7f84a9c893%40enterprisedb.com
This commit is contained in:
Tomas Vondra 2022-03-17 17:03:45 +01:00
parent a9b7e92084
commit 5a07966225
2 changed files with 67 additions and 6 deletions

View File

@ -1890,8 +1890,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
rel_publications = lappend(rel_publications, pub);
/*
* We want to publish the changes as the top-most ancestor
* across all publications. So we need to check if the
@ -1902,9 +1900,27 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
if (publish_ancestor_level > ancestor_level)
continue;
/* The new value is an ancestor, so let's keep it. */
publish_as_relid = pub_relid;
publish_ancestor_level = ancestor_level;
/*
* If we found an ancestor higher up in the tree, discard
* the list of publications through which we replicate it,
* and use the new ancestor.
*/
if (publish_ancestor_level < ancestor_level)
{
publish_as_relid = pub_relid;
publish_ancestor_level = ancestor_level;
/* reset the publication list for this relation */
rel_publications = NIL;
}
else
{
/* Same ancestor level, has to be the same OID. */
Assert(publish_as_relid == pub_relid);
}
/* Track publications for this ancestor. */
rel_publications = lappend(rel_publications, pub);
}
}

View File

@ -237,6 +237,11 @@ $node_publisher->safe_psql('postgres',
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_child (b text) INHERITS (tab_rowfilter_inherited)"
);
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_viaroot_part (a int) PARTITION BY RANGE (a)");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_viaroot_part_1 PARTITION OF tab_rowfilter_viaroot_part FOR VALUES FROM (1) TO (20)"
);
# setup structure on subscriber
$node_subscriber->safe_psql('postgres',
@ -283,6 +288,11 @@ $node_subscriber->safe_psql('postgres',
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_child (b text) INHERITS (tab_rowfilter_inherited)"
);
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_viaroot_part (a int)");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_viaroot_part_1 (a int)"
);
# setup logical replication
$node_publisher->safe_psql('postgres',
@ -330,6 +340,15 @@ $node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_inherits FOR TABLE tab_rowfilter_inherited WHERE (a > 15)"
);
# two publications, each publishing the partition through a different ancestor, with
# different row filters
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_viaroot_1 FOR TABLE tab_rowfilter_viaroot_part WHERE (a > 15) WITH (publish_via_partition_root)"
);
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_viaroot_2 FOR TABLE tab_rowfilter_viaroot_part_1 WHERE (a < 15) WITH (publish_via_partition_root)"
);
#
# The following INSERTs are executed before the CREATE SUBSCRIPTION, so these
# SQL commands are for testing the initial data copy using logical replication.
@ -376,7 +395,7 @@ $node_publisher->safe_psql('postgres',
);
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits"
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
);
$node_publisher->wait_for_catchup($appname);
@ -534,6 +553,8 @@ $node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_inherited (a) VALUES (14), (16)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_child (a, b) VALUES (13, '13'), (17, '17')");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_viaroot_part (a) VALUES (14), (15), (16)");
$node_publisher->wait_for_catchup($appname);
@ -688,6 +709,30 @@ $result =
"SELECT a = repeat('1234567890', 200), b FROM tab_rowfilter_toast");
is($result, qq(t|1), 'check replicated rows to tab_rowfilter_toast');
# Check expected replicated rows for tab_rowfilter_viaroot_part and
# tab_rowfilter_viaroot_part_1. We should replicate only rows matching
# the row filter for the top-level ancestor:
#
# tab_rowfilter_viaroot_part filter is: (a > 15)
# - INSERT (14) NO, 14 < 15
# - INSERT (15) NO, 15 = 15
# - INSERT (16) YES, 16 > 15
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a FROM tab_rowfilter_viaroot_part");
is( $result, qq(16),
'check replicated rows to tab_rowfilter_viaroot_part'
);
# Check there is no data in tab_rowfilter_viaroot_part_1 because rows are
# replicated via the top most parent table tab_rowfilter_viaroot_part
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a FROM tab_rowfilter_viaroot_part_1");
is( $result, qq(),
'check replicated rows to tab_rowfilter_viaroot_part_1'
);
# Testcase end: FOR TABLE with row filter publications
# ======================================================