Add BEGIN/COMMIT for transactional messages during decoding.
In test_decoding module, when skip_empty_xacts option was specified, add BEGIN/COMMIT for transactional messages. This makes the handling of transactional messages consistent irrespective of whether skip_empty_xacts option was specified. We decided not to backpatch this change because skip_empty_xacts is primarily used to have consistent test results across different runs and this change won't help with that. Author: Vignesh C Reviewed-by: Ashutosh Bapat, Hou Zhijie Discussion: https://postgr.es/m/CAExHW5ujRhbOz6_aTq_jQA8NjeFqq9d_8G9viShWvXx8gdSXiQ@mail.gmail.com
This commit is contained in:
parent
4e9fa6d56b
commit
26dd0284b9
@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
|
|||||||
ignorethis
|
ignorethis
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
|
||||||
data
|
data
|
||||||
--------------------------------------------------------------------
|
--------------------------------------------------------------------
|
||||||
|
BEGIN
|
||||||
message: transactional: 1 prefix: test, sz: 4 content:msg1
|
message: transactional: 1 prefix: test, sz: 4 content:msg1
|
||||||
|
COMMIT
|
||||||
message: transactional: 0 prefix: test, sz: 4 content:msg2
|
message: transactional: 0 prefix: test, sz: 4 content:msg2
|
||||||
message: transactional: 0 prefix: test, sz: 4 content:msg4
|
message: transactional: 0 prefix: test, sz: 4 content:msg4
|
||||||
message: transactional: 0 prefix: test, sz: 4 content:msg6
|
message: transactional: 0 prefix: test, sz: 4 content:msg6
|
||||||
|
BEGIN
|
||||||
message: transactional: 1 prefix: test, sz: 4 content:msg5
|
message: transactional: 1 prefix: test, sz: 4 content:msg5
|
||||||
message: transactional: 1 prefix: test, sz: 4 content:msg7
|
message: transactional: 1 prefix: test, sz: 4 content:msg7
|
||||||
|
COMMIT
|
||||||
|
BEGIN
|
||||||
message: transactional: 1 prefix: test, sz: 11 content:czechtastic
|
message: transactional: 1 prefix: test, sz: 11 content:czechtastic
|
||||||
(7 rows)
|
COMMIT
|
||||||
|
(13 rows)
|
||||||
|
|
||||||
-- test db filtering
|
-- test db filtering
|
||||||
\set prevdb :DBNAME
|
\set prevdb :DBNAME
|
||||||
|
@ -19,7 +19,7 @@ COMMIT;
|
|||||||
|
|
||||||
SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
|
SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
|
||||||
|
|
||||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
|
||||||
|
|
||||||
-- test db filtering
|
-- test db filtering
|
||||||
\set prevdb :DBNAME
|
\set prevdb :DBNAME
|
||||||
|
@ -743,6 +743,18 @@ pg_decode_message(LogicalDecodingContext *ctx,
|
|||||||
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
|
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
|
||||||
const char *prefix, Size sz, const char *message)
|
const char *prefix, Size sz, const char *message)
|
||||||
{
|
{
|
||||||
|
TestDecodingData *data = ctx->output_plugin_private;
|
||||||
|
TestDecodingTxnData *txndata;
|
||||||
|
|
||||||
|
txndata = transactional ? txn->output_plugin_private : NULL;
|
||||||
|
|
||||||
|
/* output BEGIN if we haven't yet for transactional messages */
|
||||||
|
if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
|
||||||
|
pg_output_begin(ctx, data, txn, false);
|
||||||
|
|
||||||
|
if (transactional)
|
||||||
|
txndata->xact_wrote_changes = true;
|
||||||
|
|
||||||
OutputPluginPrepareWrite(ctx, true);
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
|
appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
|
||||||
transactional, prefix, sz);
|
transactional, prefix, sz);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user