Fix relation descriptor leak.

We missed closing the relation descriptor while sending changes via the
root of partitioned relations during logical replication.

Author: Amit Langote and Mark Zhao
Reviewed-by: Amit Kapila and Ashutosh Bapat
Backpatch-through: 13, where it was introduced
Discussion: https://postgr.es/m/tencent_41FEA657C206F19AB4F406BE9252A0F69C06@qq.com
Discussion: https://postgr.es/m/tencent_6E296D2F7D70AFC90D83353B69187C3AA507@qq.com
This commit is contained in:
Amit Kapila 2021-01-12 08:19:39 +05:30
parent d6ad34f341
commit 044aa9e70e

View File

@ -502,6 +502,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContext old; MemoryContext old;
RelationSyncEntry *relentry; RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId; TransactionId xid = InvalidTransactionId;
Relation ancestor = NULL;
if (!is_publishable_relation(relation)) if (!is_publishable_relation(relation))
return; return;
@ -552,7 +553,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (relentry->publish_as_relid != RelationGetRelid(relation)) if (relentry->publish_as_relid != RelationGetRelid(relation))
{ {
Assert(relation->rd_rel->relispartition); Assert(relation->rd_rel->relispartition);
relation = RelationIdGetRelation(relentry->publish_as_relid); ancestor = RelationIdGetRelation(relentry->publish_as_relid);
relation = ancestor;
/* Convert tuple if needed. */ /* Convert tuple if needed. */
if (relentry->map) if (relentry->map)
tuple = execute_attr_map_tuple(tuple, relentry->map); tuple = execute_attr_map_tuple(tuple, relentry->map);
@ -574,7 +576,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (relentry->publish_as_relid != RelationGetRelid(relation)) if (relentry->publish_as_relid != RelationGetRelid(relation))
{ {
Assert(relation->rd_rel->relispartition); Assert(relation->rd_rel->relispartition);
relation = RelationIdGetRelation(relentry->publish_as_relid); ancestor = RelationIdGetRelation(relentry->publish_as_relid);
relation = ancestor;
/* Convert tuples if needed. */ /* Convert tuples if needed. */
if (relentry->map) if (relentry->map)
{ {
@ -598,7 +601,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (relentry->publish_as_relid != RelationGetRelid(relation)) if (relentry->publish_as_relid != RelationGetRelid(relation))
{ {
Assert(relation->rd_rel->relispartition); Assert(relation->rd_rel->relispartition);
relation = RelationIdGetRelation(relentry->publish_as_relid); ancestor = RelationIdGetRelation(relentry->publish_as_relid);
relation = ancestor;
/* Convert tuple if needed. */ /* Convert tuple if needed. */
if (relentry->map) if (relentry->map)
oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
@ -616,6 +620,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Assert(false); Assert(false);
} }
if (RelationIsValid(ancestor))
{
RelationClose(ancestor);
ancestor = NULL;
}
/* Cleanup */ /* Cleanup */
MemoryContextSwitchTo(old); MemoryContextSwitchTo(old);
MemoryContextReset(data->context); MemoryContextReset(data->context);