diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index e2cce49471..3e338f4cc5 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -116,7 +116,17 @@ ALTER PUBLICATION name RENAME TO Optionally, a column list can be specified. See for details. + linkend="sql-createpublication"/> for details. Note that a subscription + having several publications in which the same table has been published + with different column lists is not supported. So, changing the column + lists of the tables being subscribed could cause inconsistency of column + lists among publications, in which case ALTER PUBLICATION + will be successful but later the WalSender on the publisher or the + subscriber may throw an error. In this scenario, the user needs to + recreate the subscription after adjusting the column list or drop the + problematic publication using + ALTER SUBSCRIPTION ... DROP PUBLICATION and then add + it back after adjusting the column list. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 203bb41844..35b39c28da 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -355,6 +355,11 @@ CREATE SUBSCRIPTION subscription_name + + Subscriptions having several publications in which the same table has been + published with different column lists are not supported. + + We allow non-existent publications to be specified so that users can add those later. This means diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 690cdaa426..83e6eae855 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1754,6 +1754,11 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) /* * Get the list of tables which belong to specified publications on the * publisher connection. + * + * Note that we don't support the case where the column list is different for + * the same table in different publications to avoid sending unwanted column + * information for some of the rows. This can happen when both the column + * list and row filter are specified for different publications. */ static List * fetch_table_list(WalReceiverConn *wrconn, List *publications) @@ -1761,17 +1766,23 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; + Oid tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID}; List *tablelist = NIL; + bool check_columnlist = (walrcv_server_version(wrconn) >= 150000); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" - " FROM pg_catalog.pg_publication_tables t\n" + appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n"); + + /* Get column lists for each relation if the publisher supports it */ + if (check_columnlist) + appendStringInfoString(&cmd, ", t.attnames\n"); + + appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n" " WHERE t.pubname IN ("); get_publications_str(publications, &cmd, true); appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1795,7 +1806,14 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) Assert(!isnull); rv = makeRangeVar(nspname, relname, -1); - tablelist = lappend(tablelist, rv); + + if (check_columnlist && list_member(tablelist, rv)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + nspname, relname)); + else + tablelist = lappend(tablelist, rv); ExecClearTuple(slot); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 994c7a09d9..670c6fcada 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -753,17 +753,6 @@ fetch_remote_table_info(char *nspname, char *relname, /* * Get column lists for each relation. * - * For initial synchronization, column lists can be ignored in following - * cases: - * - * 1) one of the subscribed publications for the table hasn't specified - * any column list - * - * 2) one of the subscribed publications has puballtables set to true - * - * 3) one of the subscribed publications is declared as ALL TABLES IN - * SCHEMA that includes this relation - * * We need to do this before fetching info about column names and types, * so that we can skip columns that should not be replicated. */ @@ -771,7 +760,7 @@ fetch_remote_table_info(char *nspname, char *relname, { WalRcvExecResult *pubres; TupleTableSlot *slot; - Oid attrsRow[] = {INT2OID}; + Oid attrsRow[] = {INT2VECTOROID}; StringInfoData pub_names; bool first = true; @@ -786,19 +775,17 @@ fetch_remote_table_info(char *nspname, char *relname, /* * Fetch info about column lists for the relation (from all the - * publications). We unnest the int2vector values, because that makes - * it easier to combine lists by simply adding the attnums to a new - * bitmap (without having to parse the int2vector data). This - * preserves NULL values, so that if one of the publications has no - * column list, we'll know that. + * publications). */ resetStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT DISTINCT unnest" + "SELECT DISTINCT" + " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)" + " THEN NULL ELSE gpt.attrs END)" " FROM pg_publication p," - " LATERAL pg_get_publication_tables(p.pubname) gpt" - " LEFT OUTER JOIN unnest(gpt.attrs) ON TRUE" - " WHERE gpt.relid = %u" + " LATERAL pg_get_publication_tables(p.pubname) gpt," + " pg_class c" + " WHERE gpt.relid = %u AND c.oid = gpt.relid" " AND p.pubname IN ( %s )", lrel->remoteid, pub_names.data); @@ -813,26 +800,43 @@ fetch_remote_table_info(char *nspname, char *relname, nspname, relname, pubres->err))); /* - * Merge the column lists (from different publications) by creating a - * single bitmap with all the attnums. If we find a NULL value, that - * means one of the publications has no column list for the table - * we're syncing. + * We don't support the case where the column list is different for + * the same table when combining publications. See comments atop + * fetch_table_list. So there should be only one row returned. + * Although we already checked this when creating the subscription, we + * still need to check here in case the column list was changed after + * creating the subscription and before the sync worker is started. + */ + if (tuplestore_tuple_count(pubres->tuplestore) > 1) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + nspname, relname)); + + /* + * Get the column list and build a single bitmap with the attnums. + * + * If we find a NULL value, it means all the columns should be + * replicated. */ slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) + if (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) { Datum cfval = slot_getattr(slot, 1, &isnull); - /* NULL means empty column list, so we're done. */ - if (isnull) + if (!isnull) { - bms_free(included_cols); - included_cols = NULL; - break; - } + ArrayType *arr; + int nelems; + int16 *elems; - included_cols = bms_add_member(included_cols, - DatumGetInt16(cfval)); + arr = DatumGetArrayTypeP(cfval); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + for (natt = 0; natt < nelems; natt++) + included_cols = bms_add_member(included_cols, elems[natt]); + } ExecClearTuple(slot); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 42c06af239..8deae57143 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -979,30 +979,31 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry) { ListCell *lc; + bool first = true; + Relation relation = RelationIdGetRelation(entry->publish_as_relid); /* * Find if there are any column lists for this relation. If there are, - * build a bitmap merging all the column lists. - * - * All the given publication-table mappings must be checked. + * build a bitmap using the column lists. * * Multiple publications might have multiple column lists for this * relation. * + * Note that we don't support the case where the column list is different + * for the same table when combining publications. See comments atop + * fetch_table_list. But one can later change the publication so we still + * need to check all the given publication-table mappings and report an + * error if any publications have a different column list. + * * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column - * list" so it takes precedence. + * list". */ foreach(lc, publications) { Publication *pub = lfirst(lc); HeapTuple cftuple = NULL; Datum cfdatum = 0; - - /* - * Assume there's no column list. Only if we find pg_publication_rel - * entry with a column list we'll switch it to false. - */ - bool pub_no_list = true; + Bitmapset *cols = NULL; /* * If the publication is FOR ALL TABLES then it is treated the same as @@ -1011,6 +1012,8 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, */ if (!pub->alltables) { + bool pub_no_list = true; + /* * Check for the presence of a column list in this publication. * @@ -1024,51 +1027,48 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, if (HeapTupleIsValid(cftuple)) { - /* - * Lookup the column list attribute. - * - * Note: We update the pub_no_list value directly, because if - * the value is NULL, we have no list (and vice versa). - */ + /* Lookup the column list attribute. */ cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple, Anum_pg_publication_rel_prattrs, &pub_no_list); - /* - * Build the column list bitmap in the per-entry context. - * - * We need to merge column lists from all publications, so we - * update the same bitmapset. If the column list is null, we - * interpret it as replicating all columns. - */ + /* Build the column list bitmap in the per-entry context. */ if (!pub_no_list) /* when not null */ { pgoutput_ensure_entry_cxt(data, entry); - entry->columns = pub_collist_to_bitmapset(entry->columns, - cfdatum, - entry->entry_cxt); + cols = pub_collist_to_bitmapset(cols, cfdatum, + entry->entry_cxt); + + /* + * If column list includes all the columns of the table, + * set it to NULL. + */ + if (bms_num_members(cols) == RelationGetNumberOfAttributes(relation)) + { + bms_free(cols); + cols = NULL; + } } + + ReleaseSysCache(cftuple); } } - /* - * Found a publication with no column list, so we're done. But first - * discard column list we might have from preceding publications. - */ - if (pub_no_list) + if (first) { - if (cftuple) - ReleaseSysCache(cftuple); - - bms_free(entry->columns); - entry->columns = NULL; - - break; + entry->columns = cols; + first = false; } - - ReleaseSysCache(cftuple); + else if (!bms_equal(entry->columns, cols)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + get_namespace_name(RelationGetNamespace(relation)), + RelationGetRelationName(relation))); } /* loop all subscribed publications */ + + RelationClose(relation); } /* diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index 19812e11f3..7f031bc195 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -20,6 +20,7 @@ $node_subscriber->append_conf('postgresql.conf', $node_subscriber->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +my $offset = 0; sub wait_for_subscription_sync { @@ -361,13 +362,13 @@ is( $result, qq(1|abc 2|xyz), 'update on column tab2.c is not replicated'); -# TEST: add a table to two publications with different column lists, and +# TEST: add a table to two publications with same column lists, and # create a single subscription replicating both publications $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int); CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b); - CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d); + CREATE PUBLICATION pub3 FOR TABLE tab5 (a, b); -- insert a couple initial records INSERT INTO tab5 VALUES (1, 11, 111, 1111); @@ -388,8 +389,7 @@ wait_for_subscription_sync($node_subscriber); $node_publisher->wait_for_catchup('sub1'); -# insert data and make sure all the columns (union of the columns lists) -# get fully replicated +# insert data and make sure the columns in column list get fully replicated $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab5 VALUES (3, 33, 333, 3333); @@ -399,42 +399,12 @@ $node_publisher->safe_psql( $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab5 ORDER BY a"), - qq(1|11|1111 -2|22|2222 -3|33|3333 -4|44|4444), + qq(1|11| +2|22| +3|33| +4|44|), 'overlapping publications with overlapping column lists'); -# and finally, remove the column list for one of the publications, which -# means replicating all columns (removing the column list), but first add -# the missing column to the table on subscriber -$node_publisher->safe_psql( - 'postgres', qq( - ALTER PUBLICATION pub3 SET TABLE tab5; -)); - -$node_subscriber->safe_psql( - 'postgres', qq( - ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; - ALTER TABLE tab5 ADD COLUMN c INT; -)); - -wait_for_subscription_sync($node_subscriber); - -$node_publisher->safe_psql( - 'postgres', qq( - INSERT INTO tab5 VALUES (5, 55, 555, 5555); -)); - -$node_publisher->wait_for_catchup('sub1'); - -is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab5 ORDER BY a"), - qq(1|11|1111| -2|22|2222| -3|33|3333| -4|44|4444| -5|55|5555|555), - 'overlapping publications with overlapping column lists'); # TEST: create a table with a column list, then change the replica # identity by replacing a primary key (but use a different column in @@ -900,57 +870,21 @@ is( $node_subscriber->safe_psql( 3|), 'partitions with different replica identities not replicated correctly'); -# TEST: With a table included in multiple publications, we should use a -# union of the column lists. So with column lists (a,b) and (a,c) we -# should replicate (a,b,c). -$node_publisher->safe_psql( - 'postgres', qq( - CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); - CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b); - CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c); - - -- initial data - INSERT INTO test_mix_1 VALUES (1, 2, 3); -)); - -$node_subscriber->safe_psql( - 'postgres', qq( - CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); - ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_1, pub_mix_2; -)); - -wait_for_subscription_sync($node_subscriber); - -$node_publisher->safe_psql( - 'postgres', qq( - INSERT INTO test_mix_1 VALUES (4, 5, 6); -)); - -$node_publisher->wait_for_catchup('sub1'); - -is( $node_subscriber->safe_psql( - 'postgres', "SELECT * FROM test_mix_1 ORDER BY a"), - qq(1|2|3 -4|5|6), - 'a mix of publications should use a union of column list'); - - -# TEST: With a table included in multiple publications, we should use a -# union of the column lists. If any of the publications is FOR ALL -# TABLES, we should replicate all columns. +# TEST: With a table included in the publications which is FOR ALL TABLES, it +# means replicate all columns. # drop unnecessary tables, so as not to interfere with the FOR ALL TABLES $node_publisher->safe_psql( 'postgres', qq( - DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_mix_1, + DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_part, test_part_a, test_part_b, test_part_c, test_part_d; )); $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int); - CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b); + CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b, c); CREATE PUBLICATION pub_mix_4 FOR ALL TABLES; -- initial data @@ -976,12 +910,11 @@ $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_2"), qq(1|2|3 4|5|6), - 'a mix of publications should use a union of column list'); + 'all columns should be replicated'); -# TEST: With a table included in multiple publications, we should use a -# union of the column lists. If any of the publications is FOR ALL -# TABLES IN SCHEMA, we should replicate all columns. +# TEST: With a table included in the publication which is FOR ALL TABLES +# IN SCHEMA, it means replicate all columns. $node_subscriber->safe_psql( 'postgres', qq( @@ -993,7 +926,7 @@ $node_publisher->safe_psql( 'postgres', qq( DROP TABLE test_mix_2; CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int); - CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b); + CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b, c); CREATE PUBLICATION pub_mix_6 FOR ALL TABLES IN SCHEMA public; -- initial data @@ -1017,7 +950,7 @@ $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_3"), qq(1|2|3 4|5|6), - 'a mix of publications should use a union of column list'); + 'all columns should be replicated'); # TEST: Check handling of publish_via_partition_root - if a partition is @@ -1074,7 +1007,7 @@ is( $node_subscriber->safe_psql( # TEST: Multiple publications which publish schema of parent table and # partition. The partition is published through two publications, once # through a schema (so no column list) containing the parent, and then -# also directly (with a columns list). The expected outcome is there is +# also directly (with all columns). The expected outcome is there is # no column list. $node_publisher->safe_psql( @@ -1086,7 +1019,7 @@ $node_publisher->safe_psql( CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10); CREATE PUBLICATION pub1 FOR ALL TABLES IN SCHEMA s1; - CREATE PUBLICATION pub2 FOR TABLE t_1(b); + CREATE PUBLICATION pub2 FOR TABLE t_1(a, b, c); -- initial data INSERT INTO s1.t VALUES (1, 2, 3); @@ -1233,6 +1166,51 @@ is( $node_subscriber->safe_psql( 'publication containing both parent and child relation'); +# TEST: With a table included in multiple publications with different column +# lists, we should catch the error when creating the subscription. + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); + CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b); + CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c); +)); + +$node_subscriber->safe_psql('postgres', qq( + DROP SUBSCRIPTION sub1; + CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); +)); + +my ($cmdret, $stdout, $stderr) = $node_subscriber->psql( + 'postgres', qq( + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2; +)); + +ok( $stderr =~ + qr/cannot use different column lists for table "public.test_mix_1" in different publications/, + 'different column lists detected'); + +# TEST: If the column list is changed after creating the subscription, we +# should catch the error reported by walsender. + +$node_publisher->safe_psql('postgres', qq( + ALTER PUBLICATION pub_mix_1 SET TABLE test_mix_1 (a, c); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2; +)); + +$node_publisher->wait_for_catchup('sub1'); + +$node_publisher->safe_psql('postgres', qq( + ALTER PUBLICATION pub_mix_1 SET TABLE test_mix_1 (a, b); + INSERT INTO test_mix_1 VALUES(1, 1, 1); +)); + +$offset = $node_publisher->wait_for_log( + qr/cannot use different column lists for table "public.test_mix_1" in different publications/, + $offset); + $node_subscriber->stop('fast'); $node_publisher->stop('fast');