Fix the misuse of origin filter across multiple pg_logical_slot_get_changes() calls.
The pgoutput module uses a global variable (publish_no_origin) to cache the action for the origin filter, but we didn't reset the flag when shutting down the output plugin, so subsequent retries may access the previous publish_no_origin value. We fix this by storing the flag in the output plugin's private data. Additionally, the patch removes the currently unused origin string from the structure. For the back branch, to avoid changing the exposed structure, we eliminated the global variable and instead directly used the origin string for change filtering. Author: Hou Zhijie Reviewed-by: Amit Kapila, Michael Paquier Backpatch-through: 16 Discussion: http://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
parent
6fc3a138b1
commit
54ccfd6586
@ -267,3 +267,59 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn
|
|||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
|
||||||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
|
||||||
|
?column?
|
||||||
|
----------
|
||||||
|
init
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE PUBLICATION pub FOR TABLE target_tbl;
|
||||||
|
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
|
||||||
|
pg_replication_origin_create
|
||||||
|
------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- mark session as replaying
|
||||||
|
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
|
||||||
|
pg_replication_origin_session_setup
|
||||||
|
-------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO target_tbl(data) VALUES ('test data');
|
||||||
|
-- The replayed change will be filtered.
|
||||||
|
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
|
||||||
|
?column?
|
||||||
|
----------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- The replayed change will be output if the origin value is not specified.
|
||||||
|
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
|
||||||
|
?column?
|
||||||
|
----------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Clean up
|
||||||
|
SELECT pg_replication_origin_session_reset();
|
||||||
|
pg_replication_origin_session_reset
|
||||||
|
-------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT pg_drop_replication_slot('regression_slot');
|
||||||
|
pg_drop_replication_slot
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
|
||||||
|
pg_replication_origin_drop
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP PUBLICATION pub;
|
||||||
|
@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL
|
|||||||
SELECT pg_replication_origin_session_reset();
|
SELECT pg_replication_origin_session_reset();
|
||||||
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
|
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
|
||||||
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
|
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
|
||||||
|
|
||||||
|
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
|
||||||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
|
||||||
|
CREATE PUBLICATION pub FOR TABLE target_tbl;
|
||||||
|
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
|
||||||
|
|
||||||
|
-- mark session as replaying
|
||||||
|
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
|
||||||
|
|
||||||
|
INSERT INTO target_tbl(data) VALUES ('test data');
|
||||||
|
|
||||||
|
-- The replayed change will be filtered.
|
||||||
|
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
|
||||||
|
|
||||||
|
-- The replayed change will be output if the origin value is not specified.
|
||||||
|
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
|
||||||
|
|
||||||
|
-- Clean up
|
||||||
|
SELECT pg_replication_origin_session_reset();
|
||||||
|
SELECT pg_drop_replication_slot('regression_slot');
|
||||||
|
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
|
||||||
|
DROP PUBLICATION pub;
|
||||||
|
@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
|
|||||||
|
|
||||||
static bool publications_valid;
|
static bool publications_valid;
|
||||||
static bool in_streaming;
|
static bool in_streaming;
|
||||||
static bool publish_no_origin;
|
|
||||||
|
|
||||||
static List *LoadPublications(List *pubnames);
|
static List *LoadPublications(List *pubnames);
|
||||||
static void publication_invalidation_cb(Datum arg, int cacheid,
|
static void publication_invalidation_cb(Datum arg, int cacheid,
|
||||||
@ -381,21 +380,23 @@ parse_output_parameters(List *options, PGOutputData *data)
|
|||||||
}
|
}
|
||||||
else if (strcmp(defel->defname, "origin") == 0)
|
else if (strcmp(defel->defname, "origin") == 0)
|
||||||
{
|
{
|
||||||
|
char *origin;
|
||||||
|
|
||||||
if (origin_option_given)
|
if (origin_option_given)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
errcode(ERRCODE_SYNTAX_ERROR),
|
errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
errmsg("conflicting or redundant options"));
|
errmsg("conflicting or redundant options"));
|
||||||
origin_option_given = true;
|
origin_option_given = true;
|
||||||
|
|
||||||
data->origin = defGetString(defel);
|
origin = defGetString(defel);
|
||||||
if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
|
if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
|
||||||
publish_no_origin = true;
|
data->publish_no_origin = true;
|
||||||
else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
|
else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
|
||||||
publish_no_origin = false;
|
data->publish_no_origin = false;
|
||||||
else
|
else
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
errmsg("unrecognized origin value: \"%s\"", data->origin));
|
errmsg("unrecognized origin value: \"%s\"", origin));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
|
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
|
||||||
@ -1673,7 +1674,9 @@ static bool
|
|||||||
pgoutput_origin_filter(LogicalDecodingContext *ctx,
|
pgoutput_origin_filter(LogicalDecodingContext *ctx,
|
||||||
RepOriginId origin_id)
|
RepOriginId origin_id)
|
||||||
{
|
{
|
||||||
if (publish_no_origin && origin_id != InvalidRepOriginId)
|
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
|
||||||
|
|
||||||
|
if (data->publish_no_origin && origin_id != InvalidRepOriginId)
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
@ -29,7 +29,7 @@ typedef struct PGOutputData
|
|||||||
char streaming;
|
char streaming;
|
||||||
bool messages;
|
bool messages;
|
||||||
bool two_phase;
|
bool two_phase;
|
||||||
char *origin;
|
bool publish_no_origin;
|
||||||
} PGOutputData;
|
} PGOutputData;
|
||||||
|
|
||||||
#endif /* PGOUTPUT_H */
|
#endif /* PGOUTPUT_H */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user