mirror of https://github.com/postgres/postgres
Handle heap rewrites better in logical replication
A FOR ALL TABLES publication naturally considers all base tables to be a candidate for replication. This includes transient heaps that are created during a table rewrite during DDL. This causes failures on the subscriber side because it will not have a table like pg_temp_16386 to receive data (and if it did, it would be the wrong table). The prevent this problem, we filter out any tables that match this naming pattern and match an actual table from FOR ALL TABLES publications. This is only a heuristic, meaning that user tables that match that naming could accidentally be omitted. A more robust solution might require an explicit marking of such tables in pg_class somehow. Reported-by: yxq <yxq@o2.pl> Bug: #14785 Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
This commit is contained in:
parent
22c5e73562
commit
ab28feae2b
|
@ -21,6 +21,7 @@
|
|||
|
||||
#include "utils/inval.h"
|
||||
#include "utils/int8.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/varlena.h"
|
||||
|
@ -509,6 +510,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
|
|||
{
|
||||
Publication *pub = lfirst(lc);
|
||||
|
||||
/*
|
||||
* Skip tables that look like they are from a heap rewrite (see
|
||||
* make_new_heap()). We need to skip them because the subscriber
|
||||
* won't have a table by that name to receive the data. That
|
||||
* means we won't ship the new data in, say, an added column with
|
||||
* a DEFAULT, but if the user applies the same DDL manually on the
|
||||
* subscriber, then this will work out for them.
|
||||
*
|
||||
* We only need to consider the alltables case, because such a
|
||||
* transient heap won't be an explicit member of a publication.
|
||||
*/
|
||||
if (pub->alltables)
|
||||
{
|
||||
char *relname = get_rel_name(relid);
|
||||
unsigned int u;
|
||||
int n;
|
||||
|
||||
if (sscanf(relname, "pg_temp_%u%n", &u, &n) == 1 &&
|
||||
relname[n] == '\0')
|
||||
{
|
||||
if (get_rel_relkind(u) == RELKIND_RELATION)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pub->alltables || list_member_oid(pubids, pub->oid))
|
||||
{
|
||||
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
# Test logical replication behavior with heap rewrites
|
||||
use strict;
|
||||
use warnings;
|
||||
use PostgresNode;
|
||||
use TestLib;
|
||||
use Test::More tests => 2;
|
||||
|
||||
sub wait_for_caught_up
|
||||
{
|
||||
my ($node, $appname) = @_;
|
||||
|
||||
$node->poll_query_until('postgres',
|
||||
"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
|
||||
) or die "Timed out while waiting for subscriber to catch up";
|
||||
}
|
||||
|
||||
my $node_publisher = get_new_node('publisher');
|
||||
$node_publisher->init(allows_streaming => 'logical');
|
||||
$node_publisher->start;
|
||||
|
||||
my $node_subscriber = get_new_node('subscriber');
|
||||
$node_subscriber->init(allows_streaming => 'logical');
|
||||
$node_subscriber->start;
|
||||
|
||||
my $ddl = "CREATE TABLE test1 (a int, b text);";
|
||||
$node_publisher->safe_psql('postgres', $ddl);
|
||||
$node_subscriber->safe_psql('postgres', $ddl);
|
||||
|
||||
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
|
||||
my $appname = 'encoding_test';
|
||||
|
||||
$node_publisher->safe_psql('postgres',
|
||||
"CREATE PUBLICATION mypub FOR ALL TABLES;");
|
||||
$node_subscriber->safe_psql('postgres',
|
||||
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;"
|
||||
);
|
||||
|
||||
wait_for_caught_up($node_publisher, $appname);
|
||||
|
||||
# Wait for initial sync to finish as well
|
||||
my $synced_query =
|
||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
|
||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
||||
or die "Timed out while waiting for subscriber to synchronize data";
|
||||
|
||||
$node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
|
||||
|
||||
wait_for_caught_up($node_publisher, $appname);
|
||||
|
||||
is($node_subscriber->safe_psql('postgres', q{SELECT a, b FROM test1}),
|
||||
qq(1|one
|
||||
2|two),
|
||||
'initial data replicated to subscriber');
|
||||
|
||||
# DDL that causes a heap rewrite
|
||||
my $ddl2 = "ALTER TABLE test1 ADD c int NOT NULL DEFAULT 0;";
|
||||
$node_subscriber->safe_psql('postgres', $ddl2);
|
||||
$node_publisher->safe_psql('postgres', $ddl2);
|
||||
|
||||
wait_for_caught_up($node_publisher, $appname);
|
||||
|
||||
$node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b, c) VALUES (3, 'three', 33);});
|
||||
|
||||
wait_for_caught_up($node_publisher, $appname);
|
||||
|
||||
is($node_subscriber->safe_psql('postgres', q{SELECT a, b, c FROM test1}),
|
||||
qq(1|one|0
|
||||
2|two|0
|
||||
3|three|33),
|
||||
'data replicated to subscriber');
|
||||
|
||||
$node_subscriber->stop;
|
||||
$node_publisher->stop;
|
Loading…
Reference in New Issue